1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal;
17
18 import java.net.SocketAddress;
19 import java.nio.ByteBuffer;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import net.sf.beep4j.Channel;
31 import net.sf.beep4j.ChannelHandler;
32 import net.sf.beep4j.ChannelHandlerFactory;
33 import net.sf.beep4j.CloseChannelCallback;
34 import net.sf.beep4j.CloseChannelRequest;
35 import net.sf.beep4j.Message;
36 import net.sf.beep4j.MessageBuilder;
37 import net.sf.beep4j.ProfileInfo;
38 import net.sf.beep4j.ProtocolException;
39 import net.sf.beep4j.Reply;
40 import net.sf.beep4j.ReplyHandler;
41 import net.sf.beep4j.Session;
42 import net.sf.beep4j.SessionHandler;
43 import net.sf.beep4j.StartChannelRequest;
44 import net.sf.beep4j.StartSessionRequest;
45 import net.sf.beep4j.internal.message.DefaultMessageBuilder;
46 import net.sf.beep4j.internal.profile.BEEPError;
47 import net.sf.beep4j.internal.profile.ChannelManagementProfile;
48 import net.sf.beep4j.internal.profile.ChannelManagementProfileImpl;
49 import net.sf.beep4j.internal.profile.Greeting;
50 import net.sf.beep4j.internal.profile.StartChannelCallback;
51 import net.sf.beep4j.internal.util.Assert;
52 import net.sf.beep4j.internal.util.IntegerSequence;
53 import net.sf.beep4j.internal.util.Sequence;
54 import net.sf.beep4j.transport.TransportContext;
55
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class SessionImpl
72 implements MessageHandler, SessionManager, InternalSession, TransportContext, FrameHandlerFactory {
73
74 private final Logger LOG = LoggerFactory.getLogger(SessionImpl.class);
75
76 private final boolean initiator;
77
78 private final Map<Integer,Sequence<Integer>> messageNumberSequences = new HashMap<Integer,Sequence<Integer>>();
79
80 private final Map<Integer,LinkedList<ReplyHandlerHolder>> replyListeners = new HashMap<Integer,LinkedList<ReplyHandlerHolder>>();
81
82 private final Map<String,Reply> responseHandlers = new HashMap<String,Reply>();
83
84 private final Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
85
86 private final Map<Integer,ChannelHandler> channelHandlers = new HashMap<Integer,ChannelHandler>();
87
88 private final ChannelManagementProfile channelManagementProfile;
89
90 private final TransportMapping mapping;
91
92 private final SessionHandler sessionHandler;
93
94 private final Sequence<Integer> channelNumberSequence;
95
96 private final List<SessionListener> listeners = Collections.synchronizedList(new ArrayList<SessionListener>());
97
98 private final StreamParser parser;
99
100 private final Lock sessionLock = new ReentrantLock();
101
102 private SessionState currentState;
103
104 private SessionState initialState;
105
106 private SessionState aliveState;
107
108 private SessionState waitForResponseState;
109
110 private SessionState deadState;
111
112
113
114
115 private Greeting greeting;
116
117 public SessionImpl(boolean initiator, SessionHandler sessionHandler, TransportMapping mapping) {
118 Assert.notNull("sessionHandler", sessionHandler);
119 Assert.notNull("mapping", mapping);
120
121 this.initiator = initiator;
122 this.sessionHandler = new SessionHandlerWrapper(sessionHandler);
123 this.mapping = mapping;
124
125 addSessionListener(mapping);
126
127 DelegatingFrameHandler frameHandler = new DelegatingFrameHandler(this);
128 addSessionListener(frameHandler);
129
130 this.channelManagementProfile = createChannelManagementProfile(initiator);
131 initChannelManagementProfile();
132
133 this.channelNumberSequence = new IntegerSequence(initiator ? 1 : 2, 2);
134 this.parser = createStreamParser(frameHandler, mapping);
135
136 initialState = new InitialState();
137 aliveState = new AliveState();
138 waitForResponseState = new WaitForResponseState();
139 deadState = new DeadState();
140 currentState = initialState;
141 }
142
143 protected StreamParser createStreamParser(FrameHandler frameHandler, TransportMapping mapping) {
144 return new DefaultStreamParser(frameHandler, mapping);
145 }
146
147 protected ChannelManagementProfile createChannelManagementProfile(boolean initiator) {
148 return new ChannelManagementProfileImpl(initiator);
149 }
150
151 protected void initChannelManagementProfile() {
152 ChannelHandler channelHandler = channelManagementProfile.createChannelHandler(this);
153 InternalChannel channel = createChannel(this, "", 0);
154 channelHandler = initChannel(channel, channelHandler);
155 channelHandler.channelOpened(channel);
156 registerChannel(0, channel, channelHandler);
157 }
158
159 protected InternalChannel createChannel(InternalSession session, String profileUri, int channelNumber) {
160 return new ChannelImpl(session, profileUri, channelNumber);
161 }
162
163 private ChannelHandler initChannel(InternalChannel channel, ChannelHandler handler) {
164 return new ChannelHandlerWrapper(channel.initChannel(handler));
165 }
166
167 protected Reply createReply(TransportMapping mapping, int channelNumber, int messageNumber) {
168 Reply responseHandler = new DefaultReply(mapping, channelNumber, messageNumber);
169 setReply(channelNumber, messageNumber, responseHandler);
170 return responseHandler;
171 }
172
173 protected void lock() {
174 sessionLock.lock();
175 }
176
177 protected void unlock() {
178 sessionLock.unlock();
179 }
180
181 private String traceInfo() {
182 StringBuilder builder = new StringBuilder();
183 builder.append("[").append(System.identityHashCode(this));
184 builder.append("|").append(currentState);
185 builder.append("|").append(initiator).append("] ");
186 return builder.toString();
187 }
188
189 private void debug(Object... messages) {
190 if (LOG.isDebugEnabled()) {
191 StringBuffer buffer = new StringBuffer();
192 for (Object message : messages) {
193 buffer.append(message);
194 }
195 LOG.debug(buffer.toString());
196 }
197 }
198
199 private void info(String message) {
200 if (LOG.isInfoEnabled()) {
201 LOG.info(traceInfo() + message);
202 }
203 }
204
205 private void warn(String message, Exception e) {
206 LOG.warn(traceInfo() + message, e);
207 }
208
209 private void setCurrentState(SessionState currentState) {
210 debug("setting session state from ", this.currentState, " to ", currentState);
211 this.currentState = currentState;
212 }
213
214 private SessionState getCurrentState() {
215 return currentState;
216 }
217
218 protected void addSessionListener(SessionListener l) {
219 listeners.add(l);
220 }
221
222 protected void fireChannelStarted(int channel) {
223 SessionListener[] list = listeners.toArray(new SessionListener[listeners.size()]);
224 for (int i = 0; i < list.length; i++) {
225 SessionListener listener = list[i];
226 listener.channelStarted(channel);
227 }
228
229 }
230
231 protected void fireChannelClosed(int channel) {
232 SessionListener[] list = listeners.toArray(new SessionListener[listeners.size()]);
233 for (int i = 0; i < list.length; i++) {
234 SessionListener listener = list[i];
235 listener.channelClosed(channel);
236 }
237
238 }
239
240 private int getNextChannelNumber() {
241 return channelNumberSequence.next();
242 }
243
244 private void validateChannelNumber(int number) {
245 if (number <= 0) {
246 throw new ProtocolException(number + " is an illegal channel number");
247 }
248 if (initiator && number % 2 != 0) {
249 throw new ProtocolException("channel numbers of listener peer "
250 + "must be even numbered");
251 } else if (!initiator && number % 2 != 1) {
252 throw new ProtocolException("channel numbers of initator peer "
253 + "must be odd numbered");
254 }
255 }
256
257 private boolean hasOpenChannels() {
258 return channels.size() > 1;
259 }
260
261 private void registerChannel(int channelNumber, Channel channel, ChannelHandler handler) {
262 channels.put(channelNumber, channel);
263 channelHandlers.put(channelNumber, handler);
264 messageNumberSequences.put(channelNumber, new IntegerSequence(1, 1));
265 replyListeners.put(channelNumber, new LinkedList<ReplyHandlerHolder>());
266 fireChannelStarted(channelNumber);
267 }
268
269 private void unregisterChannel(int channelNumber) {
270 channels.remove(channelNumber);
271 channelHandlers.remove(channelNumber);
272 messageNumberSequences.remove(channelNumber);
273 replyListeners.remove(channelNumber);
274 fireChannelClosed(channelNumber);
275 }
276
277 private void registerReplyListener(int channelNumber, int messageNumber, ReplyHandler listener) {
278 LinkedList<ReplyHandlerHolder> expectedReplies = replyListeners.get(channelNumber);
279 expectedReplies.addLast(new ReplyHandlerHolder(messageNumber, listener));
280 }
281
282 private class ReplyHandlerHolder {
283 private final int messageNumber;
284 private final ReplyHandler replyHandler;
285 protected ReplyHandlerHolder(int messageNumber, ReplyHandler listener) {
286 this.messageNumber = messageNumber;
287 this.replyHandler = listener;
288 }
289 protected void receivedANS(int channelNumber, int messageNumber, Message message) {
290 validateMessageNumber(channelNumber, messageNumber);
291 unlock();
292 try {
293 replyHandler.receivedANS(message);
294 } finally {
295 lock();
296 }
297 }
298 protected void receivedNUL(int channelNumber, int messageNumber) {
299 validateMessageNumber(channelNumber, messageNumber);
300 unlock();
301 try {
302 replyHandler.receivedNUL();
303 } finally {
304 lock();
305 }
306 }
307 protected void receivedERR(int channelNumber, int messageNumber, Message message) {
308 validateMessageNumber(channelNumber, messageNumber);
309 unlock();
310 try {
311 replyHandler.receivedERR(message);
312 } finally {
313 lock();
314 }
315 }
316 protected void receivedRPY(int channelNumber, int messageNumber, Message message) {
317 validateMessageNumber(channelNumber, messageNumber);
318 unlock();
319 try {
320 replyHandler.receivedRPY(message);
321 } finally {
322 lock();
323 }
324 }
325 private void validateMessageNumber(int channelNumber, int messageNumber) {
326 if (this.messageNumber != messageNumber) {
327 throw new ProtocolException("next expected reply on channel "
328 + channelNumber + " must have message number "
329 + this.messageNumber + " but was "
330 + messageNumber);
331 }
332 }
333 }
334
335 private ReplyHandlerHolder unregisterReplyListener(int channelNumber) {
336 LinkedList<ReplyHandlerHolder> listeners = replyListeners.get(channelNumber);
337 return listeners.removeFirst();
338 }
339
340 private int getNextMessageNumber(int channelNumber) {
341 Sequence<Integer> sequence = getMessageNumberSequence(channelNumber);
342 Integer next = sequence.next();
343 return next;
344 }
345
346 private Sequence<Integer> getMessageNumberSequence(int channelNumber) {
347 Sequence<Integer> result = messageNumberSequences.get(channelNumber);
348 if (result == null) {
349 throw new InternalException("no open channel with channel number " + channelNumber);
350 }
351 return result;
352 }
353
354 private ReplyHandlerHolder getReplyListener(int channelNumber, int messageNumber) {
355 LinkedList<ReplyHandlerHolder> listeners = replyListeners.get(channelNumber);
356 if (listeners.isEmpty()) {
357 throw new ProtocolException("received a reply but expects no outstanding replies");
358 }
359 return listeners.getFirst();
360 }
361
362 private Reply getReply(int channelNumber, int messageNumber) {
363 Reply handler = responseHandlers.get(key(channelNumber, messageNumber));
364 return handler;
365 }
366
367 private void setReply(int channelNumber, int messageNumber, Reply responseHandler) {
368 responseHandlers.put(key(channelNumber, messageNumber), responseHandler);
369 }
370
371 private ChannelHandler getChannelHandler(int channelNumber) {
372 return channelHandlers.get(channelNumber);
373 }
374
375 private String key(int channelNumber, int messageNumber) {
376 return channelNumber + ":" + messageNumber;
377 }
378
379 private void replyCompleted(int channelNumber, int messageNumber) {
380 responseHandlers.remove(key(channelNumber, messageNumber));
381 }
382
383
384
385
386 public String[] getProfiles() {
387 if (greeting == null) {
388 throw new IllegalStateException("greeting has not yet been received");
389 }
390 return greeting.getProfiles();
391 }
392
393 public void startChannel(String profileUri, ChannelHandler handler) {
394 startChannel(new ProfileInfo(profileUri), handler);
395 }
396
397 public void startChannel(final ProfileInfo profile, final ChannelHandler handler) {
398 startChannel(new ProfileInfo[] { profile }, new ChannelHandlerFactory() {
399 public ChannelHandler createChannelHandler(ProfileInfo info) {
400 if (!profile.getUri().equals(info.getUri())) {
401 throw new IllegalArgumentException("profile URIs do not match: "
402 + profile.getUri() + " | " + info.getUri());
403 }
404 return handler;
405 }
406 public void startChannelFailed(int code, String message) {
407 unlock();
408 try {
409 handler.channelStartFailed(code, message);
410 } finally {
411 lock();
412 }
413 }
414 });
415 }
416
417 public void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory) {
418 lock();
419 try {
420 getCurrentState().startChannel(profiles, factory);
421 } finally {
422 unlock();
423 }
424 }
425
426 public void close() {
427 lock();
428 try {
429 getCurrentState().closeSession();
430 } finally {
431 unlock();
432 }
433 }
434
435
436
437
438
439
440
441
442
443
444
445
446
447 public void sendMessage(int channelNumber, Message message, ReplyHandler listener) {
448 lock();
449 try {
450 getCurrentState().sendMessage(channelNumber, message, listener);
451 } finally {
452 unlock();
453 }
454 }
455
456
457
458
459
460 public void requestChannelClose(final int channelNumber, final CloseChannelCallback callback) {
461 Assert.notNull("callback", callback);
462 lock();
463 try {
464 channelManagementProfile.closeChannel(channelNumber, new CloseChannelCallback() {
465 public void closeDeclined(int code, String message) {
466 callback.closeDeclined(code, message);
467 }
468 public void closeAccepted() {
469 unregisterChannel(channelNumber);
470 callback.closeAccepted();
471 }
472 });
473 } finally {
474 unlock();
475 }
476 }
477
478
479
480
481
482
483 public FrameHandler createFrameHandler() {
484 return new MessageAssembler(this);
485 }
486
487
488
489
490
491
492
493
494
495
496 public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
497 return getCurrentState().channelStartRequested(channelNumber, profiles);
498 }
499
500
501
502
503
504
505
506 public void channelCloseRequested(final int channelNumber, final CloseChannelRequest request) {
507 lock();
508 try {
509 ChannelHandler handler = getChannelHandler(channelNumber);
510 handler.channelCloseRequested(new CloseChannelRequest() {
511 public void reject() {
512 request.reject();
513 }
514 public void accept() {
515 request.accept();
516 unregisterChannel(channelNumber);
517 }
518 });
519 } finally {
520 unlock();
521 }
522 }
523
524 public void sessionCloseRequested(CloseCallback callback) {
525 lock();
526 try {
527 getCurrentState().sessionCloseRequested(callback);
528 } finally {
529 unlock();
530 }
531 }
532
533
534
535
536
537
538 public void receiveMSG(int channelNumber, int messageNumber, Message message) {
539 debug("received MSG: channel=", channelNumber, ",message=", messageNumber);
540 getCurrentState().receiveMSG(channelNumber, messageNumber, message);
541 }
542
543 public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
544 debug("received ANS: channel=", channelNumber, ",message=", messageNumber, ",answer=", answerNumber);
545 getCurrentState().receiveANS(channelNumber, messageNumber, answerNumber, message);
546 }
547
548 public void receiveNUL(int channelNumber, int messageNumber) {
549 debug("received NUL: channel=", channelNumber, ",message=", messageNumber);
550 getCurrentState().receiveNUL(channelNumber, messageNumber);
551 }
552
553 public void receiveERR(int channelNumber, int messageNumber, Message message) {
554 debug("received ERR: channel=", channelNumber, ",message=", messageNumber);
555 getCurrentState().receiveERR(channelNumber, messageNumber, message);
556 }
557
558 public void receiveRPY(int channelNumber, int messageNumber, Message message) {
559 debug("received RPY: channel=", channelNumber, ",message=", messageNumber);
560 getCurrentState().receiveRPY(channelNumber, messageNumber, message);
561 }
562
563
564
565
566
567 private class ChannelHandlerWrapper implements ChannelHandler {
568
569 private final ChannelHandler target;
570
571 private ChannelHandlerWrapper(ChannelHandler target) {
572 Assert.notNull("target", target);
573 this.target = target;
574 }
575
576 public void channelOpened(Channel c) {
577 unlock();
578 try {
579 target.channelOpened(c);
580 } finally {
581 lock();
582 }
583 }
584
585 public void channelStartFailed(int code, String message) {
586 unlock();
587 try {
588 target.channelStartFailed(code, message);
589 } finally {
590 lock();
591 }
592 }
593
594 public void messageReceived(Message message, Reply reply) {
595 unlock();
596 try {
597 target.messageReceived(message, reply);
598 } finally {
599 lock();
600 }
601 }
602
603 public void channelCloseRequested(CloseChannelRequest request) {
604 unlock();
605 try {
606 target.channelCloseRequested(request);
607 } finally {
608 lock();
609 }
610 }
611
612 public void channelClosed() {
613 unlock();
614 try {
615 target.channelClosed();
616 } finally {
617 lock();
618 }
619 }
620
621 }
622
623 private class SessionHandlerWrapper implements SessionHandler {
624
625 private final SessionHandler target;
626
627 private SessionHandlerWrapper(SessionHandler target) {
628 this.target = target;
629 }
630
631 public void connectionEstablished(StartSessionRequest s) {
632 unlock();
633 try {
634 target.connectionEstablished(s);
635 } finally {
636 lock();
637 }
638 }
639
640 public void sessionOpened(Session s) {
641 unlock();
642 try {
643 target.sessionOpened(s);
644 } finally {
645 lock();
646 }
647 }
648
649 public void sessionStartDeclined(int code, String message) {
650 unlock();
651 try {
652 target.sessionStartDeclined(code, message);
653 } finally {
654 lock();
655 }
656 }
657
658 public void channelStartRequested(StartChannelRequest request) {
659 unlock();
660 try {
661 target.channelStartRequested(request);
662 } finally {
663 lock();
664 }
665 }
666
667 public void sessionClosed() {
668 unlock();
669 try {
670 target.sessionClosed();
671 } finally {
672 lock();
673 }
674 }
675
676 }
677
678
679
680
681
682
683
684
685 public void connectionEstablished(SocketAddress address) {
686 lock();
687 try {
688 getCurrentState().connectionEstablished(address);
689 } finally {
690 unlock();
691 }
692 }
693
694 public void exceptionCaught(Throwable cause) {
695
696 LOG.warn("exception caught by transport", cause);
697 }
698
699 public void messageReceived(ByteBuffer buffer) {
700 lock();
701 try {
702 parser.process(buffer);
703 } catch (ProtocolException e) {
704 warn("dropping connection because of a protocol exception", e);
705 try {
706 sessionHandler.sessionClosed();
707 } finally {
708 setCurrentState(deadState);
709 mapping.closeTransport();
710 }
711 } finally {
712 unlock();
713 }
714 }
715
716 public void connectionClosed() {
717 lock();
718 try {
719 getCurrentState().connectionClosed();
720 } finally {
721 unlock();
722 }
723 }
724
725
726
727 protected static interface SessionState extends MessageHandler {
728
729 void connectionEstablished(SocketAddress address);
730
731 void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory);
732
733 void sendMessage(int channelNumber, Message message, ReplyHandler listener);
734
735 StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles);
736
737 void closeSession();
738
739 void sessionCloseRequested(CloseCallback callback);
740
741 void connectionClosed();
742
743 }
744
745 protected abstract class AbstractSessionState implements SessionState {
746
747 public abstract String getName();
748
749 public void connectionEstablished(SocketAddress address) {
750 throw new IllegalStateException("connection already established, state=<"
751 + getName() + ">");
752 }
753
754 public void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory) {
755 throw new IllegalStateException("" +
756 "cannot start channel in state <" + getName() + ">");
757 }
758
759 public void sendMessage(int channelNumber, Message message, ReplyHandler listener) {
760 throw new IllegalStateException(
761 "cannot send messages in state <" + getName() + ">: channel="
762 + channelNumber);
763 }
764
765 public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
766 return StartChannelResponse.createCancelledResponse(550, "cannot start channel");
767 }
768
769 public void receiveANS(int channelNumber, int messageNumber,
770 int answerNumber, Message message) {
771 throw new IllegalStateException(
772 "internal error: unexpected method invocation in state <" + getName() + ">: "
773 + "message ANS, channel=" + channelNumber
774 + ",message=" + messageNumber
775 + ",answerNumber=" + answerNumber);
776 }
777
778 public void receiveERR(int channelNumber, int messageNumber, Message message) {
779 throw new IllegalStateException(
780 "internal error: unexpected method invocation in state <" + getName() + ">: "
781 + "message ERR, channel=" + channelNumber + ",message=" + messageNumber);
782 }
783
784 public void receiveMSG(int channelNumber, int messageNumber, Message message) {
785 throw new IllegalStateException(
786 "internal error: unexpected method invocation in state <" + getName() + ">: "
787 + "message MSG, channel=" + channelNumber + ",message=" + messageNumber);
788 }
789
790 public void receiveNUL(int channelNumber, int messageNumber) {
791 throw new IllegalStateException(
792 "internal error: unexpected method invocation in state <" + getName() + ">: "
793 + "message NUL, channel=" + channelNumber + ",message=" + messageNumber);
794 }
795
796 public void receiveRPY(int channelNumber, int messageNumber, Message message) {
797 throw new IllegalStateException(
798 "internal error: unexpected method invocation in state <" + getName() + ">: "
799 + "message RPY, channel=" + channelNumber + ",message=" + messageNumber);
800 }
801
802 public void closeSession() {
803 throw new IllegalStateException("cannot close session");
804 }
805
806 public void sessionCloseRequested(CloseCallback callback) {
807 throw new IllegalStateException("cannot close session");
808 }
809
810 }
811
812 protected class InitialState extends AbstractSessionState {
813
814 @Override
815 public String getName() {
816 return "initial";
817 }
818
819 public void connectionEstablished(SocketAddress address) {
820 Reply reply = new InitialReply(mapping);
821 setReply(0, 0, reply);
822 if (!channelManagementProfile.connectionEstablished(address, sessionHandler, reply)) {
823 setCurrentState(deadState);
824 mapping.closeTransport();
825 }
826 }
827
828 public void receiveMSG(int channelNumber, int messageNumber, Message message) {
829 throw new ProtocolException(
830 "first message in a session must be RPY or ERR on channel 0: "
831 + "was MSG channel=" + channelNumber + ",message=" + messageNumber);
832 }
833
834 public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
835 throw new ProtocolException(
836 "first message in a session must be RPY or ERR on channel 0: "
837 + "was ANS channel=" + channelNumber + ",message=" + messageNumber);
838 }
839
840 public void receiveNUL(int channelNumber, int messageNumber) {
841 throw new ProtocolException(
842 "first message in a session must be RPY or ERR on channel 0: "
843 + "was NUL channel=" + channelNumber + ",message=" + messageNumber);
844 }
845
846 public void receiveRPY(int channelNumber, int messageNumber, Message message) {
847 validateMessage(channelNumber, messageNumber);
848 greeting = channelManagementProfile.receivedGreeting(message);
849 setCurrentState(aliveState);
850 sessionHandler.sessionOpened(SessionImpl.this);
851 }
852
853 public void receiveERR(int channelNumber, int messageNumber, Message message) {
854 validateMessage(channelNumber, messageNumber);
855 BEEPError error = channelManagementProfile.receivedError(message);
856
857 info("received error, session start failed: " + error.getCode() + ":"
858 + error.getMessage());
859
860 sessionHandler.sessionStartDeclined(error.getCode(), error.getMessage());
861 setCurrentState(deadState);
862 mapping.closeTransport();
863 }
864
865 private void validateMessage(int channelNumber, int messageNumber) {
866 if (channelNumber != 0 || messageNumber != 0) {
867 throw new ProtocolException("first message in session must be sent on "
868 + "channel 0 with message number 0: was channel " + channelNumber
869 + ",message=" + messageNumber);
870 }
871 }
872
873 public void connectionClosed() {
874 setCurrentState(deadState);
875 }
876
877 @Override
878 public String toString() {
879 return "<initial>";
880 }
881
882 }
883
884 protected class AliveState extends AbstractSessionState {
885
886 @Override
887 public String getName() {
888 return "alive";
889 }
890
891 @Override
892 public void startChannel(final ProfileInfo[] profiles, final ChannelHandlerFactory factory) {
893 final int channelNumber = getNextChannelNumber();
894 channelManagementProfile.startChannel(channelNumber, profiles, new StartChannelCallback() {
895 public void channelCreated(ProfileInfo info) {
896 ChannelHandler handler = factory.createChannelHandler(info);
897 InternalChannel channel = createChannel(
898 SessionImpl.this, info.getUri(), channelNumber);
899 ChannelHandler channelHandler = initChannel(channel, handler);
900 registerChannel(channelNumber, channel, channelHandler);
901 channelHandler.channelOpened(channel);
902 }
903 public void channelFailed(int code, String message) {
904 factory.startChannelFailed(code, message);
905 }
906 });
907 }
908
909 @Override
910 public void sendMessage(int channelNumber, Message message, ReplyHandler listener) {
911 int messageNumber = getNextMessageNumber(channelNumber);
912 debug("send message: channel=", channelNumber, ",message=", messageNumber);
913 registerReplyListener(channelNumber, messageNumber, listener);
914 mapping.sendMSG(channelNumber, messageNumber, message);
915 }
916
917 @Override
918 public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
919 validateChannelNumber(channelNumber);
920
921 debug("start of channel ", channelNumber, " requested by remote peer: ", Arrays.toString(profiles));
922 DefaultStartChannelRequest request = new DefaultStartChannelRequest(profiles);
923 sessionHandler.channelStartRequested(request);
924
925 StartChannelResponse response = request.getResponse();
926
927 if (response.isCancelled()) {
928 debug("start of channel ", channelNumber, " is cancelled by application: ", response.getCode(),
929 ",'", response.getMessage(), "'");
930 return response;
931 }
932
933 ProfileInfo info = response.getProfile();
934 if (info == null) {
935 throw new ProtocolException("StartChannelRequest must be either cancelled or a profile must be selected");
936 }
937
938 debug("start of channel ", channelNumber, " is accepted by application: ", info.getUri());
939
940 InternalChannel channel = createChannel(SessionImpl.this, info.getUri(), channelNumber);
941 ChannelHandler handler = channel.initChannel(response.getChannelHandler());
942 handler.channelOpened(channel);
943 registerChannel(channelNumber, channel, handler);
944
945 return response;
946 }
947
948 @Override
949 public void receiveMSG(int channelNumber, int messageNumber, Message message) {
950 Reply reply = getReply(channelNumber, messageNumber);
951 if (reply != null) {
952
953
954
955
956
957 throw new ProtocolException("Message number " + messageNumber
958 + " on channel " + channelNumber + " refers to a MSG message "
959 + "that has been received but for which a reply has not been "
960 + "completely sent.");
961 }
962 reply = createReply(mapping, channelNumber, messageNumber);
963
964 ChannelHandler handler = getChannelHandler(channelNumber);
965 handler.messageReceived(message, reply);
966 }
967
968 @Override
969 public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
970 ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
971 listener.receivedANS(channelNumber, messageNumber, message);
972 }
973
974 @Override
975 public void receiveNUL(int channelNumber, int messageNumber) {
976 ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
977 try {
978 listener.receivedNUL(channelNumber, messageNumber);
979 } finally {
980 unregisterReplyListener(channelNumber);
981 }
982 }
983
984 @Override
985 public void receiveERR(int channelNumber, int messageNumber, Message message) {
986 ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
987 try {
988 listener.receivedERR(channelNumber, messageNumber, message);
989 } finally {
990 unregisterReplyListener(channelNumber);
991 }
992 }
993
994 @Override
995 public void receiveRPY(int channelNumber, int messageNumber, Message message) {
996 ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
997 try {
998 listener.receivedRPY(channelNumber, messageNumber, message);
999 } finally {
1000 unregisterReplyListener(channelNumber);
1001 }
1002 }
1003
1004 @Override
1005 public void closeSession() {
1006 setCurrentState(waitForResponseState);
1007 channelManagementProfile.closeSession(new CloseCallback() {
1008 public void closeDeclined(int code, String message) {
1009 debug("close session declined by remote peer: " + code + ":" + message);
1010 performClose();
1011 }
1012
1013 public void closeAccepted() {
1014 debug("close session accepted by remote peer");
1015 performClose();
1016 }
1017
1018 private void performClose() {
1019 try {
1020 sessionHandler.sessionClosed();
1021 } finally {
1022 setCurrentState(deadState);
1023 mapping.closeTransport();
1024 }
1025 }
1026 });
1027 }
1028
1029 @Override
1030 public void sessionCloseRequested(CloseCallback callback) {
1031 if (hasOpenChannels()) {
1032 callback.closeDeclined(550, "still working");
1033 } else {
1034 callback.closeAccepted();
1035 try {
1036 sessionHandler.sessionClosed();
1037 } finally {
1038 setCurrentState(deadState);
1039 mapping.closeTransport();
1040 }
1041 }
1042 }
1043
1044 public void connectionClosed() {
1045 try {
1046 sessionHandler.sessionClosed();
1047 } finally {
1048 setCurrentState(deadState);
1049 }
1050 }
1051
1052 @Override
1053 public String toString() {
1054 return "<alive>";
1055 }
1056
1057 }
1058
1059 protected class WaitForResponseState extends AliveState {
1060
1061 @Override
1062 public String getName() {
1063 return "close-initiated";
1064 }
1065
1066 @Override
1067 public void sessionCloseRequested(CloseCallback callback) {
1068 debug("received session close request while close is already in progress");
1069 try {
1070 sessionHandler.sessionClosed();
1071 } finally {
1072 callback.closeAccepted();
1073 mapping.closeTransport();
1074 setCurrentState(deadState);
1075 }
1076 }
1077
1078 @Override
1079 public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
1080 return StartChannelResponse.createCancelledResponse(550, "session release in progress");
1081 }
1082
1083 @Override
1084 public String toString() {
1085 return "<wait-for-response>";
1086 }
1087
1088 }
1089
1090 protected class DeadState extends AbstractSessionState {
1091
1092 @Override
1093 public String getName() {
1094 return "dead";
1095 }
1096
1097 public void connectionClosed() {
1098
1099 }
1100
1101 @Override
1102 public String toString() {
1103 return "<dead>";
1104 }
1105
1106 }
1107
1108 protected class DefaultReply implements Reply {
1109
1110 private final TransportMapping mapping;
1111
1112 private final int channel;
1113
1114 private final int messageNumber;
1115
1116 private int answerNumber = 0;
1117
1118 private boolean complete;
1119
1120 public DefaultReply(TransportMapping mapping, int channel, int messageNumber) {
1121 Assert.notNull("mapping", mapping);
1122 this.mapping = mapping;
1123 this.channel = channel;
1124 this.messageNumber = messageNumber;
1125 }
1126
1127 private void checkCompletion() {
1128 if (complete) {
1129 throw new IllegalStateException("a complete reply has already been sent");
1130 }
1131 }
1132
1133 private void complete() {
1134 complete = true;
1135 replyCompleted(channel, messageNumber);
1136 }
1137
1138 public MessageBuilder createMessageBuilder() {
1139 return new DefaultMessageBuilder();
1140 }
1141
1142 public void sendANS(Message message) {
1143 Assert.notNull("message", message);
1144 lock();
1145 try {
1146 checkCompletion();
1147 mapping.sendANS(channel, messageNumber, answerNumber++, message);
1148 } finally {
1149 unlock();
1150 }
1151 }
1152
1153 public void sendERR(Message message) {
1154 Assert.notNull("message", message);
1155 lock();
1156 try {
1157 checkCompletion();
1158 mapping.sendERR(channel, messageNumber, message);
1159 complete();
1160 } finally {
1161 unlock();
1162 }
1163 }
1164
1165 public void sendNUL() {
1166 lock();
1167 try {
1168 checkCompletion();
1169 mapping.sendNUL(channel, messageNumber);
1170 complete();
1171 } finally {
1172 unlock();
1173 }
1174 }
1175
1176 public void sendRPY(Message message) {
1177 Assert.notNull("message", message);
1178 lock();
1179 try {
1180 checkCompletion();
1181 mapping.sendRPY(channel, messageNumber, message);
1182 complete();
1183 } finally {
1184 unlock();
1185 }
1186 }
1187
1188 }
1189
1190 protected class InitialReply extends DefaultReply {
1191
1192 public InitialReply(TransportMapping mapping) {
1193 super(mapping, 0, 0);
1194 }
1195
1196 @Override
1197 public void sendANS(Message message) {
1198 throw new InternalException("ANS is not a valid initial response");
1199 }
1200
1201 @Override
1202 public void sendNUL() {
1203 throw new InternalException("NUL is not a valid initial response");
1204 }
1205
1206 }
1207
1208 }