View Javadoc

1   /*
2    *  Copyright 2006 Simon Raess
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.beep4j.internal;
17  
18  import net.sf.beep4j.Channel;
19  import net.sf.beep4j.ChannelHandler;
20  import net.sf.beep4j.CloseChannelCallback;
21  import net.sf.beep4j.CloseChannelRequest;
22  import net.sf.beep4j.Message;
23  import net.sf.beep4j.MessageBuilder;
24  import net.sf.beep4j.Reply;
25  import net.sf.beep4j.ReplyHandler;
26  import net.sf.beep4j.Session;
27  import net.sf.beep4j.internal.message.DefaultMessageBuilder;
28  import net.sf.beep4j.internal.util.Assert;
29  
30  class ChannelImpl implements Channel, ChannelHandler, InternalChannel {
31  	
32  	private final InternalSession session;
33  	
34  	private final String profile;
35  	
36  	private final int channelNumber;
37  	
38  	private ChannelHandler channelHandler;
39  	
40  	private State state = new Alive();
41  	
42  	/**
43  	 * Counter that counts how many messages we have sent but to which we
44  	 * have not received a reply.
45  	 */
46  	private int outstandingReplyCount;
47  	
48  	/**
49  	 * Counter that counts how many messages we have received but to which
50  	 * we have not sent a response.
51  	 */
52  	private int outstandingResponseCount;
53  	
54  	public ChannelImpl(
55  			InternalSession session, 
56  			String profile, 
57  			int channelNumber) {
58  		this.session = session;
59  		this.profile = profile;
60  		this.channelNumber = channelNumber;
61  	}
62  	
63  	public ChannelHandler initChannel(ChannelHandler channelHandler) {
64  		Assert.notNull("channelHandler", channelHandler);
65  		this.channelHandler = channelHandler;
66  		return this;
67  	}
68  	
69  	public boolean isAlive() {
70  		return state instanceof Alive;
71  	}
72  	
73  	public boolean isDead() {
74  		return state instanceof Dead;
75  	}
76  	
77  	public boolean isShuttingDown() {
78  		return !isAlive() && !isDead();
79  	}
80  	
81  	public String getProfile() {
82  		return profile;
83  	}
84  
85  	public Session getSession() {
86  		return session;
87  	}
88  	
89  	public MessageBuilder createMessageBuilder() {
90  		return new DefaultMessageBuilder();
91  	}
92  	
93  	protected void setState(State state) {
94  		this.state = state;
95  		this.state.checkCondition();
96  	}
97  	
98  	public void sendMessage(Message message, ReplyHandler reply) {
99  		Assert.notNull("message", message);
100 		Assert.notNull("listener", reply);
101 		state.sendMessage(message, new ReplyHandlerWrapper(reply));
102 	}
103 	
104 	public void close(CloseChannelCallback callback) {
105 		Assert.notNull("callback", callback);
106 		state.closeInitiated(callback);
107 	}
108 	
109 	public void channelClosed() {
110 		channelHandler.channelClosed();
111 	}
112 	
113 	public void channelStartFailed(int code, String message) {
114 		channelHandler.channelStartFailed(code, message);
115 	}
116 	
117 	public void channelOpened(Channel c) {
118 		channelHandler.channelOpened(this);		
119 	}
120 	
121 	public void messageReceived(Message message, Reply reply) {
122 		state.messageReceived(message, new ReplyWrapper(reply));		
123 	}
124 	
125 	public void channelCloseRequested(CloseChannelRequest request) {
126 		state.closeRequested(request);
127 	}
128 	
129 	private synchronized void incrementOutstandingReplyCount() {
130 		outstandingReplyCount++;
131 	}
132 	
133 	private synchronized void decrementOutstandingReplyCount() {
134 		outstandingReplyCount--;
135 		state.checkCondition();
136 	}
137 	
138 	private synchronized boolean hasOutstandingReplies() {
139 		return outstandingReplyCount > 0;
140 	}
141 	
142 	private synchronized void incrementOutstandingResponseCount() {
143 		outstandingResponseCount++;
144 	}
145 	
146 	private synchronized void decrementOutstandingResponseCount() {
147 		outstandingResponseCount--;
148 		state.checkCondition();
149 	}
150 	
151 	private synchronized boolean hasOutstandingResponses() {
152 		return outstandingResponseCount > 0;
153 	}
154 	
155 	private synchronized boolean isReadyToShutdown() {
156 		return !hasOutstandingReplies() && !hasOutstandingResponses();
157 	}
158 
159 	/*
160 	 * Wrapper for ReplyHandler that decrements a counter whenever
161 	 * a complete message has been received. Intercepts calls to 
162 	 * the real ReplyHandler from the application to make this
163 	 * book-keeping possible.
164 	 */
165 	private class ReplyHandlerWrapper implements ReplyHandler {
166 
167 		private final ReplyHandler target;
168 		
169 		private ReplyHandlerWrapper(ReplyHandler target) {
170 			this.target = target;
171 			incrementOutstandingReplyCount();
172 		}
173 		
174 		public void receivedANS(Message message) {
175 			target.receivedANS(message);			
176 		}
177 		
178 		public void receivedNUL() {
179 			decrementOutstandingReplyCount();
180 			target.receivedNUL();
181 		}
182 		
183 		public void receivedERR(Message message) {
184 			decrementOutstandingReplyCount();
185 			target.receivedERR(message);
186 		}
187 		
188 		public void receivedRPY(Message message) {
189 			decrementOutstandingReplyCount();
190 			target.receivedRPY(message);
191 		}
192 	}
193 	
194 	private class ReplyWrapper implements Reply {
195 		
196 		private final Reply target;
197 		
198 		private ReplyWrapper(Reply target) {
199 			this.target = target;
200 			incrementOutstandingResponseCount();
201 		}
202 		
203 		public MessageBuilder createMessageBuilder() {
204 			return target.createMessageBuilder();
205 		}
206 		
207 		public void sendANS(Message message) {
208 			target.sendANS(message);			
209 		}
210 		
211 		public void sendNUL() {
212 			decrementOutstandingResponseCount();
213 			target.sendNUL();
214 		}
215 		
216 		public void sendERR(Message message) {
217 			decrementOutstandingResponseCount();
218 			target.sendERR(message);
219 		}
220 		
221 		public void sendRPY(Message message) {
222 			decrementOutstandingResponseCount();
223 			target.sendRPY(message);
224 		}
225 	}
226 	
227 	private static interface State {
228 		
229 		void checkCondition();
230 		
231 		void sendMessage(Message message, ReplyHandler listener);
232 		
233 		void closeInitiated(CloseChannelCallback callback);
234 		
235 		void closeRequested(CloseChannelRequest request);
236 		
237 		void messageReceived(Message message, Reply handler);
238 		
239 	}
240 	
241 	private static abstract class AbstractState implements State {
242 		
243 		public void checkCondition() {
244 			// nothing to check
245 		}
246 		
247 		public void sendMessage(Message message, ReplyHandler listener) {
248 			throw new IllegalStateException();
249 		}
250 		
251 		public void closeInitiated(CloseChannelCallback callback) {
252 			throw new IllegalStateException();
253 		}
254 		
255 		public void closeRequested(CloseChannelRequest request) {
256 			throw new IllegalStateException();
257 		}
258 		
259 		public void messageReceived(Message message, Reply handler) {
260 			throw new IllegalStateException();
261 		}
262 	}
263 	
264 	private class Alive extends AbstractState {
265 		
266 		@Override
267 		public void sendMessage(final Message message, final ReplyHandler listener) {
268 			session.sendMessage(channelNumber, message, listener);
269 		}
270 		
271 		@Override
272 		public void messageReceived(Message message, Reply handler) {
273 			channelHandler.messageReceived(message, handler);
274 		}
275 		
276 		@Override
277 		public void closeInitiated(CloseChannelCallback callback) {
278 			setState(new CloseInitiated(callback));
279 		}
280 		
281 		@Override
282 		public void closeRequested(CloseChannelRequest request) {
283 			setState(new CloseRequested(request));
284 		}
285 		
286 	}
287 	
288 	private class CloseInitiated extends AbstractState {
289 		
290 		private final CloseChannelCallback callback;
291 		
292 		private CloseInitiated(CloseChannelCallback callback) {
293 			this.callback = callback;
294 		}
295 		
296 		@Override
297 		public void messageReceived(Message message, Reply handler) {
298 			channelHandler.messageReceived(message, handler);
299 		}
300 		
301 		@Override
302 		public void checkCondition() {
303 			if (isReadyToShutdown()) {
304 				session.requestChannelClose(channelNumber, new CloseChannelCallback() {
305 					public void closeDeclined(int code, String message) {
306 						callback.closeDeclined(code, message);
307 						setState(new Alive());
308 					}
309 					public void closeAccepted() {
310 						channelClosed();
311 						callback.closeAccepted();
312 						setState(new Dead());
313 					}
314 				});
315 			}
316 		}
317 		
318 		/*
319 		 * If we receive a close request in this state, we accept the close
320 		 * request immediately without consulting the application. The
321 		 * reasoning is that the application already requested to close
322 		 * the channel, so it makes no sense to let it change that 
323 		 * decision.
324 		 */
325 		@Override
326 		public void closeRequested(CloseChannelRequest request) {
327 			callback.closeAccepted();
328 			channelClosed();
329 			request.accept();
330 			setState(new Dead());
331 		}
332 		
333 	}
334 	
335 	private class CloseRequested extends AbstractState {
336 		
337 		private final CloseChannelRequest request;
338 		
339 		private CloseRequested(CloseChannelRequest request) {
340 			this.request = request;
341 		}
342 		
343 		@Override
344 		public void messageReceived(Message message, Reply handler) {
345 			channelHandler.messageReceived(message, handler);
346 		}
347 		
348 		@Override
349 		public void checkCondition() {
350 			if (isReadyToShutdown()) {
351 				channelHandler.channelCloseRequested(new CloseChannelRequest() {
352 					public void reject() {
353 						setState(new Alive());
354 						request.reject();
355 					}
356 					public void accept() {
357 						setState(new Dead());
358 						request.accept();
359 						// TODO: handle exceptions
360 						channelHandler.channelClosed();
361 					}
362 				});
363 			}
364 		}
365 	}
366 	
367 	private class Dead extends AbstractState {
368 		// dead is dead ;)
369 	}
370 	
371 }