View Javadoc

1   /*
2    *  Copyright 2006 Simon Raess
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.beep4j.internal;
17  
18  import java.net.SocketAddress;
19  import java.nio.ByteBuffer;
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.locks.Lock;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import net.sf.beep4j.Channel;
31  import net.sf.beep4j.ChannelHandler;
32  import net.sf.beep4j.ChannelHandlerFactory;
33  import net.sf.beep4j.CloseChannelCallback;
34  import net.sf.beep4j.CloseChannelRequest;
35  import net.sf.beep4j.Message;
36  import net.sf.beep4j.MessageBuilder;
37  import net.sf.beep4j.ProfileInfo;
38  import net.sf.beep4j.ProtocolException;
39  import net.sf.beep4j.Reply;
40  import net.sf.beep4j.ReplyHandler;
41  import net.sf.beep4j.Session;
42  import net.sf.beep4j.SessionHandler;
43  import net.sf.beep4j.StartChannelRequest;
44  import net.sf.beep4j.StartSessionRequest;
45  import net.sf.beep4j.internal.message.DefaultMessageBuilder;
46  import net.sf.beep4j.internal.profile.BEEPError;
47  import net.sf.beep4j.internal.profile.ChannelManagementProfile;
48  import net.sf.beep4j.internal.profile.ChannelManagementProfileImpl;
49  import net.sf.beep4j.internal.profile.Greeting;
50  import net.sf.beep4j.internal.profile.StartChannelCallback;
51  import net.sf.beep4j.internal.util.Assert;
52  import net.sf.beep4j.internal.util.IntegerSequence;
53  import net.sf.beep4j.internal.util.Sequence;
54  import net.sf.beep4j.transport.TransportContext;
55  
56  import org.slf4j.Logger;
57  import org.slf4j.LoggerFactory;
58  
59  /**
60   * Default implementation of the Session interface. Objects of this class are
61   * the central object in a BEEP session.
62   * 
63   * <ul>
64   *  <li>dispatch incoming messages</li>
65   *  <li>send outgoing messages</li>
66   *  <li>manage channel start and close</li>
67   * </ul>
68   * 
69   * @author Simon Raess
70   */
71  public class SessionImpl 
72  		implements MessageHandler, SessionManager, InternalSession, TransportContext, FrameHandlerFactory {
73  	
74  	private final Logger LOG = LoggerFactory.getLogger(SessionImpl.class);
75  	
76  	private final boolean initiator;
77  	
78  	private final Map<Integer,Sequence<Integer>> messageNumberSequences = new HashMap<Integer,Sequence<Integer>>();
79  	
80  	private final Map<Integer,LinkedList<ReplyHandlerHolder>> replyListeners = new HashMap<Integer,LinkedList<ReplyHandlerHolder>>();
81  	
82  	private final Map<String,Reply> responseHandlers = new HashMap<String,Reply>();
83  	
84  	private final Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
85  	
86  	private final Map<Integer,ChannelHandler> channelHandlers = new HashMap<Integer,ChannelHandler>(); 
87  	
88  	private final ChannelManagementProfile channelManagementProfile;
89  	
90  	private final TransportMapping mapping;
91  	
92  	private final SessionHandler sessionHandler;
93  	
94  	private final Sequence<Integer> channelNumberSequence;
95  	
96  	private final List<SessionListener> listeners = Collections.synchronizedList(new ArrayList<SessionListener>());
97  	
98  	private final StreamParser parser;
99  	
100 	private final Lock sessionLock = new ReentrantLock();
101 	
102 	private SessionState currentState;
103 	
104 	private SessionState initialState;
105 	
106 	private SessionState aliveState;
107 	
108 	private SessionState waitForResponseState;
109 	
110 	private SessionState deadState;
111 
112 	/**
113 	 * The greeting received from the other peer.
114 	 */
115 	private Greeting greeting;
116 	
117 	public SessionImpl(boolean initiator, SessionHandler sessionHandler, TransportMapping mapping) {
118 		Assert.notNull("sessionHandler", sessionHandler);
119 		Assert.notNull("mapping", mapping);
120 		
121 		this.initiator = initiator;
122 		this.sessionHandler = new SessionHandlerWrapper(sessionHandler);
123 		this.mapping = mapping;
124 		
125 		addSessionListener(mapping);
126 		
127 		DelegatingFrameHandler frameHandler = new DelegatingFrameHandler(this);
128 		addSessionListener(frameHandler);
129 		
130 		this.channelManagementProfile = createChannelManagementProfile(initiator);
131 		initChannelManagementProfile();
132 		
133 		this.channelNumberSequence = new IntegerSequence(initiator ? 1 : 2, 2);
134 		this.parser = createStreamParser(frameHandler, mapping);
135 		
136 		initialState = new InitialState();
137 		aliveState = new AliveState();
138 		waitForResponseState = new WaitForResponseState();
139 		deadState = new DeadState();
140 		currentState = initialState;
141 	}
142 
143 	protected StreamParser createStreamParser(FrameHandler frameHandler, TransportMapping mapping) {
144 		return new DefaultStreamParser(frameHandler, mapping);
145 	}
146 	
147 	protected ChannelManagementProfile createChannelManagementProfile(boolean initiator) {
148 		return new ChannelManagementProfileImpl(initiator);
149 	}
150 
151 	protected void initChannelManagementProfile() {
152 		ChannelHandler channelHandler = channelManagementProfile.createChannelHandler(this);
153 		InternalChannel channel = createChannel(this, "", 0);
154 		channelHandler = initChannel(channel, channelHandler);
155 		channelHandler.channelOpened(channel);
156 		registerChannel(0, channel, channelHandler);
157 	}
158 		
159 	protected InternalChannel createChannel(InternalSession session, String profileUri, int channelNumber) {
160 		return new ChannelImpl(session, profileUri, channelNumber);
161 	}
162 	
163 	private ChannelHandler initChannel(InternalChannel channel, ChannelHandler handler) {
164 		return new ChannelHandlerWrapper(channel.initChannel(handler));
165 	}
166 
167 	protected Reply createReply(TransportMapping mapping, int channelNumber, int messageNumber) {
168 		Reply responseHandler = new DefaultReply(mapping, channelNumber, messageNumber);
169 		setReply(channelNumber, messageNumber, responseHandler);
170 		return responseHandler;
171 	}
172 	
173 	protected void lock() {
174 		sessionLock.lock();
175 	}
176 	
177 	protected void unlock() {
178 		sessionLock.unlock();
179 	}
180 	
181 	private String traceInfo() {
182 		StringBuilder builder = new StringBuilder();
183 		builder.append("[").append(System.identityHashCode(this));
184 		builder.append("|").append(currentState);
185 		builder.append("|").append(initiator).append("] ");
186 		return builder.toString();
187 	}
188 	
189 	private void debug(Object... messages) {
190 		if (LOG.isDebugEnabled()) {
191 			StringBuffer buffer = new StringBuffer();
192 			for (Object message : messages) {
193 				buffer.append(message);
194 			}
195 			LOG.debug(buffer.toString());
196 		}
197 	}
198 	
199 	private void info(String message) {
200 		if (LOG.isInfoEnabled()) {
201 			LOG.info(traceInfo() + message);
202 		}
203 	}
204 	
205 	private void warn(String message, Exception e) {
206 		LOG.warn(traceInfo() + message, e);
207 	}
208 	
209 	private void setCurrentState(SessionState currentState) {
210 		debug("setting session state from ", this.currentState, " to ", currentState);
211 		this.currentState = currentState;
212 	}
213 
214 	private SessionState getCurrentState() {
215 		return currentState;
216 	}
217 
218 	protected void addSessionListener(SessionListener l) {
219 		listeners.add(l);
220 	}
221 	
222 	protected void fireChannelStarted(int channel) {
223 		SessionListener[] list = listeners.toArray(new SessionListener[listeners.size()]);
224 		for (int i = 0; i < list.length; i++) {
225 			SessionListener listener = list[i];
226 			listener.channelStarted(channel);
227 		}
228 		
229 	}
230 
231 	protected void fireChannelClosed(int channel) {
232 		SessionListener[] list = listeners.toArray(new SessionListener[listeners.size()]);
233 		for (int i = 0; i < list.length; i++) {
234 			SessionListener listener = list[i];
235 			listener.channelClosed(channel);
236 		}
237 		
238 	}
239 	
240 	private int getNextChannelNumber() {
241 		return channelNumberSequence.next();
242 	}
243 	
244 	private void validateChannelNumber(int number) {
245 		if (number <= 0) {
246 			throw new ProtocolException(number + " is an illegal channel number");
247 		}
248 		if (initiator && number % 2 != 0) {
249 			throw new ProtocolException("channel numbers of listener peer "
250 					+ "must be even numbered");
251 		} else if (!initiator && number % 2 != 1) {
252 			throw new ProtocolException("channel numbers of initator peer "
253 					+ "must be odd numbered");
254 		}
255 	}
256 	
257 	private boolean hasOpenChannels() {
258 		return channels.size() > 1;
259 	}
260 
261 	private void registerChannel(int channelNumber, Channel channel, ChannelHandler handler) {
262 		channels.put(channelNumber, channel);
263 		channelHandlers.put(channelNumber, handler);
264 		messageNumberSequences.put(channelNumber, new IntegerSequence(1, 1));
265 		replyListeners.put(channelNumber, new LinkedList<ReplyHandlerHolder>());
266 		fireChannelStarted(channelNumber);
267 	}
268 
269 	private void unregisterChannel(int channelNumber) {
270 		channels.remove(channelNumber);
271 		channelHandlers.remove(channelNumber);
272 		messageNumberSequences.remove(channelNumber);
273 		replyListeners.remove(channelNumber);
274 		fireChannelClosed(channelNumber);
275 	}
276 
277 	private void registerReplyListener(int channelNumber, int messageNumber, ReplyHandler listener) {
278 		LinkedList<ReplyHandlerHolder> expectedReplies = replyListeners.get(channelNumber);
279 		expectedReplies.addLast(new ReplyHandlerHolder(messageNumber, listener));
280 	}
281 	
282 	private class ReplyHandlerHolder {
283 		private final int messageNumber;
284 		private final ReplyHandler replyHandler;
285 		protected ReplyHandlerHolder(int messageNumber, ReplyHandler listener) {
286 			this.messageNumber = messageNumber;
287 			this.replyHandler = listener;
288 		}
289 		protected void receivedANS(int channelNumber, int messageNumber, Message message) {
290 			validateMessageNumber(channelNumber, messageNumber);
291 			unlock();
292 			try {
293 				replyHandler.receivedANS(message);
294 			} finally {
295 				lock();
296 			}
297 		}
298 		protected void receivedNUL(int channelNumber, int messageNumber) {
299 			validateMessageNumber(channelNumber, messageNumber);
300 			unlock();
301 			try {
302 				replyHandler.receivedNUL();
303 			} finally {
304 				lock();
305 			}
306 		}
307 		protected void receivedERR(int channelNumber, int messageNumber, Message message) {
308 			validateMessageNumber(channelNumber, messageNumber);
309 			unlock();
310 			try {
311 				replyHandler.receivedERR(message);
312 			} finally {
313 				lock();
314 			}
315 		}
316 		protected void receivedRPY(int channelNumber, int messageNumber, Message message) {
317 			validateMessageNumber(channelNumber, messageNumber);
318 			unlock();
319 			try {
320 				replyHandler.receivedRPY(message);
321 			} finally {
322 				lock();
323 			}
324 		}
325 		private void validateMessageNumber(int channelNumber, int messageNumber) {
326 			if (this.messageNumber != messageNumber) {
327 				throw new ProtocolException("next expected reply on channel "
328 						+ channelNumber + " must have message number "
329 						+ this.messageNumber + " but was "
330 						+ messageNumber);
331 			}
332 		}
333 	}
334 	
335 	private ReplyHandlerHolder unregisterReplyListener(int channelNumber) {
336 		LinkedList<ReplyHandlerHolder> listeners = replyListeners.get(channelNumber);
337 		return listeners.removeFirst();
338 	}
339 
340 	private int getNextMessageNumber(int channelNumber) {
341 		Sequence<Integer> sequence = getMessageNumberSequence(channelNumber);
342 		Integer next = sequence.next();
343 		return next;
344 	}
345 
346 	private Sequence<Integer> getMessageNumberSequence(int channelNumber) {
347 		Sequence<Integer> result = messageNumberSequences.get(channelNumber);
348 		if (result == null) {
349 			throw new InternalException("no open channel with channel number " + channelNumber);
350 		}
351 		return result;
352 	}
353 	
354 	private ReplyHandlerHolder getReplyListener(int channelNumber, int messageNumber) {
355 		LinkedList<ReplyHandlerHolder> listeners = replyListeners.get(channelNumber);
356 		if (listeners.isEmpty()) {
357 			throw new ProtocolException("received a reply but expects no outstanding replies");
358 		}
359 		return listeners.getFirst();
360 	}
361 	
362 	private Reply getReply(int channelNumber, int messageNumber) {
363 		Reply handler = responseHandlers.get(key(channelNumber, messageNumber));
364 		return handler;
365 	}
366 	
367 	private void setReply(int channelNumber, int messageNumber, Reply responseHandler) {
368 		responseHandlers.put(key(channelNumber, messageNumber), responseHandler);
369 	}
370 	
371 	private ChannelHandler getChannelHandler(int channelNumber) {
372 		return channelHandlers.get(channelNumber);
373 	}
374 	
375 	private String key(int channelNumber, int messageNumber) {
376 		return channelNumber + ":" + messageNumber;
377 	}
378 	
379 	private void replyCompleted(int channelNumber, int messageNumber) {
380 		responseHandlers.remove(key(channelNumber, messageNumber));
381 	}
382 
383 	
384 	// --> start of Session methods <--
385 	
386 	public String[] getProfiles() {
387 		if (greeting == null) {
388 			throw new IllegalStateException("greeting has not yet been received");
389 		}
390 		return greeting.getProfiles();
391 	}
392 	
393 	public void startChannel(String profileUri, ChannelHandler handler) {
394 		startChannel(new ProfileInfo(profileUri), handler);
395 	}
396 	
397 	public void startChannel(final ProfileInfo profile, final ChannelHandler handler) {
398 		startChannel(new ProfileInfo[] { profile }, new ChannelHandlerFactory() {
399 			public ChannelHandler createChannelHandler(ProfileInfo info) {
400 				if (!profile.getUri().equals(info.getUri())) {
401 					throw new IllegalArgumentException("profile URIs do not match: "
402 							+ profile.getUri() + " | " + info.getUri());
403 				}
404 				return handler;
405 			}
406 			public void startChannelFailed(int code, String message) {
407 				unlock();
408 				try {
409 					handler.channelStartFailed(code, message);
410 				} finally {
411 					lock();
412 				}
413 			}
414 		});
415 	}
416 	
417 	public void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory) {
418 		lock();
419 		try {
420 			getCurrentState().startChannel(profiles, factory);
421 		} finally {
422 			unlock();
423 		}
424 	}
425 	
426 	public void close() {
427 		lock();
428 		try {
429 			getCurrentState().closeSession();
430 		} finally {
431 			unlock();
432 		}
433 	}
434 	
435 	// --> end of Session methods <--
436 	
437 	
438 	// --> start of InternalSession methods <--
439 	
440 	/*
441 	 * This method is called by the channel implementation to send a message on
442 	 * a particular channel to the other peer. It takes care to:
443 	 * - generate a message number
444 	 * - register the reply listener under that number
445 	 * - pass the message to the underlying transport mapping
446 	 */	
447 	public void sendMessage(int channelNumber, Message message, ReplyHandler listener) {
448 		lock();
449 		try {
450 			getCurrentState().sendMessage(channelNumber, message, listener);
451 		} finally {
452 			unlock();
453 		}
454 	}
455 
456 	/*
457 	 * This method is called by the channel implementation to send a close channel
458 	 * request to the other peer.
459 	 */
460 	public void requestChannelClose(final int channelNumber, final CloseChannelCallback callback) {
461 		Assert.notNull("callback", callback);
462 		lock();
463 		try {
464 			channelManagementProfile.closeChannel(channelNumber, new CloseChannelCallback() {
465 				public void closeDeclined(int code, String message) {
466 					callback.closeDeclined(code, message);
467 				}
468 				public void closeAccepted() {
469 					unregisterChannel(channelNumber);
470 					callback.closeAccepted();
471 				}
472 			});
473 		} finally {
474 			unlock();
475 		}
476 	}
477 	
478 	// --> end of InternalSession methods <--
479 	
480 	
481 	// --> start of FrameHandlerFactory methods <--
482 	
483 	public FrameHandler createFrameHandler() {
484 		return new MessageAssembler(this);
485 	}
486 	
487 	// --> end of FrameHandlerFactory methods <--
488 	
489 	
490 	// --> start of SessionManager methods <--
491 	
492 	/*
493 	 * This method is invoked by the ChannelManagementProfile when the other
494 	 * peer requests creating a new channel.
495 	 */
496 	public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
497 		return getCurrentState().channelStartRequested(channelNumber, profiles);
498 	}
499 	
500 	/*
501 	 * This method is invoked by the ChannelManagement profile when a channel
502 	 * close request is received. This request is passed on to the ChannelHandler,
503 	 * that is the application, which decides what to do with the request to
504 	 * close the channel.
505 	 */
506 	public void channelCloseRequested(final int channelNumber, final CloseChannelRequest request) {
507 		lock();
508 		try {
509 			ChannelHandler handler = getChannelHandler(channelNumber);
510 			handler.channelCloseRequested(new CloseChannelRequest() {
511 				public void reject() {
512 					request.reject();
513 				}		
514 				public void accept() {
515 					request.accept();
516 					unregisterChannel(channelNumber);
517 				}
518 			});
519 		} finally {
520 			unlock();
521 		}
522 	}
523 	
524 	public void sessionCloseRequested(CloseCallback callback) {
525 		lock();
526 		try {
527 			getCurrentState().sessionCloseRequested(callback);
528 		} finally {
529 			unlock();
530 		}
531 	}
532 	
533 	// --> end of SessionManager methods <--
534 	
535 	
536 	// --> start of MessageHandler methods <-- 
537 
538 	public void receiveMSG(int channelNumber, int messageNumber, Message message) {
539 		debug("received MSG: channel=", channelNumber, ",message=",  messageNumber);
540 		getCurrentState().receiveMSG(channelNumber, messageNumber, message);
541 	}
542 
543 	public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
544 		debug("received ANS: channel=", channelNumber, ",message=", messageNumber, ",answer=", answerNumber);
545 		getCurrentState().receiveANS(channelNumber, messageNumber, answerNumber, message);
546 	}
547 	
548 	public void receiveNUL(int channelNumber, int messageNumber) {
549 		debug("received NUL: channel=", channelNumber, ",message=", messageNumber);
550 		getCurrentState().receiveNUL(channelNumber, messageNumber);
551 	}
552 
553 	public void receiveERR(int channelNumber, int messageNumber, Message message) {
554 		debug("received ERR: channel=", channelNumber, ",message=", messageNumber);
555 		getCurrentState().receiveERR(channelNumber, messageNumber, message);
556 	}
557 		
558 	public void receiveRPY(int channelNumber, int messageNumber, Message message) {
559 		debug("received RPY: channel=", channelNumber, ",message=", messageNumber);
560 		getCurrentState().receiveRPY(channelNumber, messageNumber, message);
561 	}
562 	
563 	// --> end of MessageHandler methods <--
564 	
565 	// --> channel handler wrapper <--
566 	
567 	private class ChannelHandlerWrapper implements ChannelHandler {
568 		
569 		private final ChannelHandler target;
570 		
571 		private ChannelHandlerWrapper(ChannelHandler target) {
572 			Assert.notNull("target", target);
573 			this.target = target;
574 		}
575 		
576 		public void channelOpened(Channel c) {
577 			unlock();
578 			try {
579 				target.channelOpened(c);
580 			} finally {
581 				lock();
582 			}
583 		}
584 		
585 		public void channelStartFailed(int code, String message) {
586 			unlock();
587 			try {
588 				target.channelStartFailed(code, message);
589 			} finally {
590 				lock();
591 			}
592 		}
593 		
594 		public void messageReceived(Message message, Reply reply) {
595 			unlock();
596 			try {
597 				target.messageReceived(message, reply);
598 			} finally {
599 				lock();
600 			}
601 		}
602 		
603 		public void channelCloseRequested(CloseChannelRequest request) {
604 			unlock();
605 			try {
606 				target.channelCloseRequested(request);
607 			} finally {
608 				lock();
609 			}
610 		}
611 		
612 		public void channelClosed() {
613 			unlock();
614 			try {
615 				target.channelClosed();
616 			} finally {
617 				lock();
618 			}
619 		}
620 		
621 	}
622 	
623 	private class SessionHandlerWrapper implements SessionHandler {
624 		
625 		private final SessionHandler target;
626 		
627 		private SessionHandlerWrapper(SessionHandler target) {
628 			this.target = target;
629 		}
630 		
631 		public void connectionEstablished(StartSessionRequest s) {
632 			unlock();
633 			try {
634 				target.connectionEstablished(s);
635 			} finally {
636 				lock();
637 			}
638 		}
639 		
640 		public void sessionOpened(Session s) {
641 			unlock();
642 			try {
643 				target.sessionOpened(s);
644 			} finally {
645 				lock();
646 			}
647 		}
648 		
649 		public void sessionStartDeclined(int code, String message) {
650 			unlock();
651 			try {
652 				target.sessionStartDeclined(code, message);
653 			} finally {
654 				lock();
655 			}
656 		}
657 		
658 		public void channelStartRequested(StartChannelRequest request) {
659 			unlock();
660 			try {
661 				target.channelStartRequested(request);
662 			} finally {
663 				lock();
664 			}
665 		}
666 		
667 		public void sessionClosed() {
668 			unlock();
669 			try {
670 				target.sessionClosed();
671 			} finally {
672 				lock();
673 			}
674 		}
675 		
676 	}
677 	
678 	// --> start of TransportContext methods <--
679 	
680 	/*
681 	 * Notifies the ChannelManagementProfile about this event. The
682 	 * ChannelManagementProfile then asks the application (SessionHandler)
683 	 * whether to accept the connection and sends the appropriate response.
684 	 */
685 	public void connectionEstablished(SocketAddress address) {
686 		lock();
687 		try {
688 			getCurrentState().connectionEstablished(address);
689 		} finally {
690 			unlock();
691 		}
692 	}
693 	
694 	public void exceptionCaught(Throwable cause) {
695 		// TODO: implement this method
696 		LOG.warn("exception caught by transport", cause);
697 	}
698 	
699 	public void messageReceived(ByteBuffer buffer) {
700 		lock();
701 		try {
702 			parser.process(buffer);
703 		} catch (ProtocolException e) {
704 			warn("dropping connection because of a protocol exception", e);
705 			try {
706 				sessionHandler.sessionClosed();
707 			} finally {
708 				setCurrentState(deadState);
709 				mapping.closeTransport();
710 			}
711 		} finally {
712 			unlock();
713 		}
714 	}
715 	
716 	public void connectionClosed() {
717 		lock();
718 		try {
719 			getCurrentState().connectionClosed();
720 		} finally {
721 			unlock();
722 		}
723 	}
724 	
725 	// --> end of TransportContext methods <--
726 
727 	protected static interface SessionState extends MessageHandler {
728 		
729 		void connectionEstablished(SocketAddress address);
730 		
731 		void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory);
732 		
733 		void sendMessage(int channelNumber, Message message, ReplyHandler listener);
734 		
735 		StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles);
736 		
737 		void closeSession();
738 		
739 		void sessionCloseRequested(CloseCallback callback);
740 		
741 		void connectionClosed();
742 		
743 	}
744 	
745 	protected abstract class AbstractSessionState implements SessionState {
746 		
747 		public abstract String getName();
748 		
749 		public void connectionEstablished(SocketAddress address) {
750 			throw new IllegalStateException("connection already established, state=<" 
751 					+ getName() + ">");
752 		}
753 		
754 		public void startChannel(ProfileInfo[] profiles, ChannelHandlerFactory factory) {
755 			throw new IllegalStateException("" +
756 					"cannot start channel in state <" + getName() + ">");
757 		}
758 		
759 		public void sendMessage(int channelNumber, Message message, ReplyHandler listener) {
760 			throw new IllegalStateException(
761 					"cannot send messages in state <" + getName() + ">: channel="
762 					+ channelNumber);
763 		}
764 		
765 		public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
766 			return StartChannelResponse.createCancelledResponse(550, "cannot start channel");
767 		}
768 
769 		public void receiveANS(int channelNumber, int messageNumber,
770 				int answerNumber, Message message) {
771 			throw new IllegalStateException(
772 					"internal error: unexpected method invocation in state <" + getName() + ">: "
773 					+ "message ANS, channel=" + channelNumber 
774 					+ ",message=" + messageNumber
775 					+ ",answerNumber=" + answerNumber);
776 		}
777 
778 		public void receiveERR(int channelNumber, int messageNumber, Message message) {
779 			throw new IllegalStateException(
780 					"internal error: unexpected method invocation in state <" + getName() + ">: "
781 					+ "message ERR, channel=" + channelNumber + ",message=" + messageNumber);
782 		}
783 
784 		public void receiveMSG(int channelNumber, int messageNumber, Message message) {
785 			throw new IllegalStateException(
786 					"internal error: unexpected method invocation in state <" + getName() + ">: "
787 					+ "message MSG, channel=" + channelNumber + ",message=" + messageNumber);
788 		}
789 
790 		public void receiveNUL(int channelNumber, int messageNumber) {
791 			throw new IllegalStateException(
792 					"internal error: unexpected method invocation in state <" + getName() + ">: "
793 					+ "message NUL, channel=" + channelNumber + ",message=" + messageNumber);
794 		}
795 
796 		public void receiveRPY(int channelNumber, int messageNumber, Message message) {
797 			throw new IllegalStateException(
798 					"internal error: unexpected method invocation in state <" + getName() + ">: "
799 					+ "message RPY, channel=" + channelNumber + ",message=" + messageNumber);
800 		}
801 		
802 		public void closeSession() {
803 			throw new IllegalStateException("cannot close session");
804 		}
805 		
806 		public void sessionCloseRequested(CloseCallback callback) {
807 			throw new IllegalStateException("cannot close session");
808 		}
809 
810 	}
811 
812 	protected class InitialState extends AbstractSessionState {
813 		
814 		@Override
815 		public String getName() {
816 			return "initial";
817 		}
818 		
819 		public void connectionEstablished(SocketAddress address) {
820 			Reply reply = new InitialReply(mapping);
821 			setReply(0, 0, reply);
822 			if (!channelManagementProfile.connectionEstablished(address, sessionHandler, reply)) {
823 				setCurrentState(deadState);
824 				mapping.closeTransport();
825 			}
826 		}
827 		
828 		public void receiveMSG(int channelNumber, int messageNumber, Message message) {
829 			throw new ProtocolException(
830 					"first message in a session must be RPY or ERR on channel 0: "
831 					+ "was MSG channel=" + channelNumber + ",message=" + messageNumber);
832 		}
833 		
834 		public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
835 			throw new ProtocolException(
836 					"first message in a session must be RPY or ERR on channel 0: "
837 					+ "was ANS channel=" + channelNumber + ",message=" + messageNumber);
838 		}
839 		
840 		public void receiveNUL(int channelNumber, int messageNumber) {
841 			throw new ProtocolException(
842 					"first message in a session must be RPY or ERR on channel 0: "
843 					+ "was NUL channel=" + channelNumber + ",message=" + messageNumber);
844 		}
845 		
846 		public void receiveRPY(int channelNumber, int messageNumber, Message message) {
847 			validateMessage(channelNumber, messageNumber);
848 			greeting = channelManagementProfile.receivedGreeting(message);
849 			setCurrentState(aliveState);
850 			sessionHandler.sessionOpened(SessionImpl.this);
851 		}
852 		
853 		public void receiveERR(int channelNumber, int messageNumber, Message message) {
854 			validateMessage(channelNumber, messageNumber);
855 			BEEPError error = channelManagementProfile.receivedError(message);
856 			
857 			info("received error, session start failed: " + error.getCode() + ":"
858 					+ error.getMessage());
859 			
860 			sessionHandler.sessionStartDeclined(error.getCode(), error.getMessage());
861 			setCurrentState(deadState);
862 			mapping.closeTransport();
863 		}
864 
865 		private void validateMessage(int channelNumber, int messageNumber) {
866 			if (channelNumber != 0 || messageNumber != 0) {
867 				throw new ProtocolException("first message in session must be sent on "
868 						+ "channel 0 with message number 0: was channel " + channelNumber
869 						+ ",message=" + messageNumber);
870 			}
871 		}
872 		
873 		public void connectionClosed() {
874 			setCurrentState(deadState);
875 		}
876 		
877 		@Override
878 		public String toString() {
879 			return "<initial>";
880 		}
881 
882 	}
883 	
884 	protected class AliveState extends AbstractSessionState {
885 		
886 		@Override
887 		public String getName() {
888 			return "alive";
889 		}
890 		
891 		@Override
892 		public void startChannel(final ProfileInfo[] profiles, final ChannelHandlerFactory factory) {
893 			final int channelNumber = getNextChannelNumber();
894 			channelManagementProfile.startChannel(channelNumber, profiles, new StartChannelCallback() {
895 				public void channelCreated(ProfileInfo info) {
896 					ChannelHandler handler = factory.createChannelHandler(info);
897 					InternalChannel channel = createChannel(
898 							SessionImpl.this, info.getUri(), channelNumber);
899 					ChannelHandler channelHandler = initChannel(channel, handler);
900 					registerChannel(channelNumber, channel, channelHandler);
901 					channelHandler.channelOpened(channel);
902 				}
903 				public void channelFailed(int code, String message) {
904 					factory.startChannelFailed(code, message);
905 				}
906 			});
907 		}
908 		
909 		@Override
910 		public void sendMessage(int channelNumber, Message message, ReplyHandler listener) {
911 			int messageNumber = getNextMessageNumber(channelNumber);
912 			debug("send message: channel=", channelNumber, ",message=", messageNumber);
913 			registerReplyListener(channelNumber, messageNumber, listener);
914 			mapping.sendMSG(channelNumber, messageNumber, message);
915 		}
916 		
917 		@Override
918 		public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
919 			validateChannelNumber(channelNumber);
920 
921 			debug("start of channel ", channelNumber, " requested by remote peer: ", Arrays.toString(profiles));
922 			DefaultStartChannelRequest request = new DefaultStartChannelRequest(profiles);
923 			sessionHandler.channelStartRequested(request);
924 			
925 			StartChannelResponse response = request.getResponse();
926 			
927 			if (response.isCancelled()) {
928 				debug("start of channel ", channelNumber, " is cancelled by application: ", response.getCode(), 
929 						",'", response.getMessage(), "'");
930 				return response;
931 			}
932 			
933 			ProfileInfo info = response.getProfile();
934 			if (info == null) {
935 				throw new ProtocolException("StartChannelRequest must be either cancelled or a profile must be selected");
936 			}
937 			
938 			debug("start of channel ", channelNumber, " is accepted by application: ", info.getUri());
939 			
940 			InternalChannel channel = createChannel(SessionImpl.this, info.getUri(), channelNumber);
941 			ChannelHandler handler = channel.initChannel(response.getChannelHandler());
942 			handler.channelOpened(channel);
943 			registerChannel(channelNumber, channel, handler);
944 			
945 			return response;
946 		}
947 		
948 		@Override
949 		public void receiveMSG(int channelNumber, int messageNumber, Message message) {
950 			Reply reply = getReply(channelNumber, messageNumber);
951 			if (reply != null) {
952 				// Validation of frames according to the BEEP specification section 2.2.1.1.
953 				//
954 				// A frame is poorly formed if the header starts with "MSG", and 
955 				// the message number refers to a "MSG" message that has been 
956 				// completely received but for which a reply has not been completely sent.
957 				throw new ProtocolException("Message number " + messageNumber
958 						+ " on channel " + channelNumber + " refers to a MSG message "
959 						+ "that has been received but for which a reply has not been "
960 						+ "completely sent.");
961 			}
962 			reply = createReply(mapping, channelNumber, messageNumber);
963 			
964 			ChannelHandler handler = getChannelHandler(channelNumber);
965 			handler.messageReceived(message, reply);
966 		}
967 
968 		@Override
969 		public void receiveANS(int channelNumber, int messageNumber, int answerNumber, Message message) {
970 			ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
971 			listener.receivedANS(channelNumber, messageNumber, message);
972 		}
973 		
974 		@Override
975 		public void receiveNUL(int channelNumber, int messageNumber) {
976 			ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
977 			try {
978 				listener.receivedNUL(channelNumber, messageNumber);
979 			} finally {
980 				unregisterReplyListener(channelNumber);
981 			}
982 		}
983 
984 		@Override
985 		public void receiveERR(int channelNumber, int messageNumber, Message message) {
986 			ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
987 			try {
988 				listener.receivedERR(channelNumber, messageNumber, message);
989 			} finally {
990 				unregisterReplyListener(channelNumber);
991 			}
992 		}
993 
994 		@Override
995 		public void receiveRPY(int channelNumber, int messageNumber, Message message) {
996 			ReplyHandlerHolder listener = getReplyListener(channelNumber, messageNumber);
997 			try {
998 				listener.receivedRPY(channelNumber, messageNumber, message);
999 			} finally {
1000 				unregisterReplyListener(channelNumber);
1001 			}
1002 		}
1003 		
1004 		@Override
1005 		public void closeSession() {
1006 			setCurrentState(waitForResponseState);
1007 			channelManagementProfile.closeSession(new CloseCallback() {
1008 				public void closeDeclined(int code, String message) {
1009 					debug("close session declined by remote peer: " + code + ":" + message);
1010 					performClose();
1011 				}
1012 			
1013 				public void closeAccepted() {
1014 					debug("close session accepted by remote peer");
1015 					performClose();
1016 				}
1017 				
1018 				private void performClose() {
1019 					try {
1020 						sessionHandler.sessionClosed();
1021 					} finally {
1022 						setCurrentState(deadState);
1023 						mapping.closeTransport();
1024 					}
1025 				}
1026 			});
1027 		}
1028 		
1029 		@Override
1030 		public void sessionCloseRequested(CloseCallback callback) {
1031 			if (hasOpenChannels()) {
1032 				callback.closeDeclined(550, "still working");
1033 			} else {
1034 				callback.closeAccepted();
1035 				try {
1036 					sessionHandler.sessionClosed();
1037 				} finally {
1038 					setCurrentState(deadState);
1039 					mapping.closeTransport();
1040 				}
1041 			}
1042 		}
1043 		
1044 		public void connectionClosed() {
1045 			try {
1046 				sessionHandler.sessionClosed();
1047 			} finally {
1048 				setCurrentState(deadState);
1049 			}
1050 		}
1051 		
1052 		@Override
1053 		public String toString() {
1054 			return "<alive>";
1055 		}
1056 		
1057 	}
1058 	
1059 	protected class WaitForResponseState extends AliveState {
1060 		
1061 		@Override
1062 		public String getName() {
1063 			return "close-initiated";
1064 		}
1065 		
1066 		@Override
1067 		public void sessionCloseRequested(CloseCallback callback) {
1068 			debug("received session close request while close is already in progress");
1069 			try {
1070 				sessionHandler.sessionClosed();
1071 			} finally {
1072 				callback.closeAccepted();
1073 				mapping.closeTransport();
1074 				setCurrentState(deadState);
1075 			}
1076 		}
1077 		
1078 		@Override
1079 		public StartChannelResponse channelStartRequested(int channelNumber, ProfileInfo[] profiles) {
1080 			return StartChannelResponse.createCancelledResponse(550, "session release in progress");
1081 		}
1082 		
1083 		@Override
1084 		public String toString() {
1085 			return "<wait-for-response>";
1086 		}
1087 		
1088 	}
1089 
1090 	protected class DeadState extends AbstractSessionState {
1091 		
1092 		@Override
1093 		public String getName() {
1094 			return "dead";
1095 		}
1096 		
1097 		public void connectionClosed() {
1098 			// ignore this one
1099 		}
1100 		
1101 		@Override
1102 		public String toString() {
1103 			return "<dead>";
1104 		}
1105 				
1106 	}
1107 	
1108 	protected class DefaultReply implements Reply {
1109 		
1110 		private final TransportMapping mapping;
1111 		
1112 		private final int channel;
1113 		
1114 		private final int messageNumber;
1115 		
1116 		private int answerNumber = 0;
1117 		
1118 		private boolean complete;
1119 		
1120 		public DefaultReply(TransportMapping mapping, int channel, int messageNumber) {
1121 			Assert.notNull("mapping", mapping);
1122 			this.mapping = mapping;
1123 			this.channel = channel;
1124 			this.messageNumber = messageNumber;
1125 		}
1126 		
1127 		private void checkCompletion() {
1128 			if (complete) {
1129 				throw new IllegalStateException("a complete reply has already been sent");
1130 			}
1131 		}
1132 
1133 		private void complete() {
1134 			complete = true;
1135 			replyCompleted(channel, messageNumber);
1136 		}
1137 		
1138 		public MessageBuilder createMessageBuilder() {
1139 			return new DefaultMessageBuilder();
1140 		}
1141 
1142 		public void sendANS(Message message) {
1143 			Assert.notNull("message", message);
1144 			lock();
1145 			try {
1146 				checkCompletion();
1147 				mapping.sendANS(channel, messageNumber, answerNumber++, message);
1148 			} finally {
1149 				unlock();
1150 			}
1151 		}
1152 		
1153 		public void sendERR(Message message) {
1154 			Assert.notNull("message", message);
1155 			lock();
1156 			try {
1157 				checkCompletion();
1158 				mapping.sendERR(channel, messageNumber, message);
1159 				complete();
1160 			} finally {
1161 				unlock();
1162 			}
1163 		}
1164 		
1165 		public void sendNUL() {
1166 			lock();
1167 			try {
1168 				checkCompletion();
1169 				mapping.sendNUL(channel, messageNumber);
1170 				complete();
1171 			} finally {
1172 				unlock();
1173 			}
1174 		}
1175 		
1176 		public void sendRPY(Message message) {
1177 			Assert.notNull("message", message);
1178 			lock();
1179 			try {
1180 				checkCompletion();
1181 				mapping.sendRPY(channel, messageNumber, message);
1182 				complete();
1183 			} finally {
1184 				unlock();
1185 			}
1186 		}
1187 		
1188 	}
1189 	
1190 	protected class InitialReply extends DefaultReply {
1191 		
1192 		public InitialReply(TransportMapping mapping) {
1193 			super(mapping, 0, 0);
1194 		}
1195 		
1196 		@Override
1197 		public void sendANS(Message message) {
1198 			throw new InternalException("ANS is not a valid initial response");
1199 		}
1200 		
1201 		@Override
1202 		public void sendNUL() {
1203 			throw new InternalException("NUL is not a valid initial response");
1204 		}
1205 		
1206 	}
1207 	
1208 }