1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal;
17
18 import net.sf.beep4j.Channel;
19 import net.sf.beep4j.ChannelHandler;
20 import net.sf.beep4j.CloseChannelCallback;
21 import net.sf.beep4j.CloseChannelRequest;
22 import net.sf.beep4j.Message;
23 import net.sf.beep4j.MessageBuilder;
24 import net.sf.beep4j.Reply;
25 import net.sf.beep4j.ReplyHandler;
26 import net.sf.beep4j.Session;
27 import net.sf.beep4j.internal.message.DefaultMessageBuilder;
28 import net.sf.beep4j.internal.util.Assert;
29
30 class ChannelImpl implements Channel, ChannelHandler, InternalChannel {
31
32 private final InternalSession session;
33
34 private final String profile;
35
36 private final int channelNumber;
37
38 private ChannelHandler channelHandler;
39
40 private State state = new Alive();
41
42
43
44
45
46 private int outstandingReplyCount;
47
48
49
50
51
52 private int outstandingResponseCount;
53
54 public ChannelImpl(
55 InternalSession session,
56 String profile,
57 int channelNumber) {
58 this.session = session;
59 this.profile = profile;
60 this.channelNumber = channelNumber;
61 }
62
63 public ChannelHandler initChannel(ChannelHandler channelHandler) {
64 Assert.notNull("channelHandler", channelHandler);
65 this.channelHandler = channelHandler;
66 return this;
67 }
68
69 public boolean isAlive() {
70 return state instanceof Alive;
71 }
72
73 public boolean isDead() {
74 return state instanceof Dead;
75 }
76
77 public boolean isShuttingDown() {
78 return !isAlive() && !isDead();
79 }
80
81 public String getProfile() {
82 return profile;
83 }
84
85 public Session getSession() {
86 return session;
87 }
88
89 public MessageBuilder createMessageBuilder() {
90 return new DefaultMessageBuilder();
91 }
92
93 protected void setState(State state) {
94 this.state = state;
95 this.state.checkCondition();
96 }
97
98 public void sendMessage(Message message, ReplyHandler reply) {
99 Assert.notNull("message", message);
100 Assert.notNull("listener", reply);
101 state.sendMessage(message, new ReplyHandlerWrapper(reply));
102 }
103
104 public void close(CloseChannelCallback callback) {
105 Assert.notNull("callback", callback);
106 state.closeInitiated(callback);
107 }
108
109 public void channelClosed() {
110 channelHandler.channelClosed();
111 }
112
113 public void channelStartFailed(int code, String message) {
114 channelHandler.channelStartFailed(code, message);
115 }
116
117 public void channelOpened(Channel c) {
118 channelHandler.channelOpened(this);
119 }
120
121 public void messageReceived(Message message, Reply reply) {
122 state.messageReceived(message, new ReplyWrapper(reply));
123 }
124
125 public void channelCloseRequested(CloseChannelRequest request) {
126 state.closeRequested(request);
127 }
128
129 private synchronized void incrementOutstandingReplyCount() {
130 outstandingReplyCount++;
131 }
132
133 private synchronized void decrementOutstandingReplyCount() {
134 outstandingReplyCount--;
135 state.checkCondition();
136 }
137
138 private synchronized boolean hasOutstandingReplies() {
139 return outstandingReplyCount > 0;
140 }
141
142 private synchronized void incrementOutstandingResponseCount() {
143 outstandingResponseCount++;
144 }
145
146 private synchronized void decrementOutstandingResponseCount() {
147 outstandingResponseCount--;
148 state.checkCondition();
149 }
150
151 private synchronized boolean hasOutstandingResponses() {
152 return outstandingResponseCount > 0;
153 }
154
155 private synchronized boolean isReadyToShutdown() {
156 return !hasOutstandingReplies() && !hasOutstandingResponses();
157 }
158
159
160
161
162
163
164
165 private class ReplyHandlerWrapper implements ReplyHandler {
166
167 private final ReplyHandler target;
168
169 private ReplyHandlerWrapper(ReplyHandler target) {
170 this.target = target;
171 incrementOutstandingReplyCount();
172 }
173
174 public void receivedANS(Message message) {
175 target.receivedANS(message);
176 }
177
178 public void receivedNUL() {
179 decrementOutstandingReplyCount();
180 target.receivedNUL();
181 }
182
183 public void receivedERR(Message message) {
184 decrementOutstandingReplyCount();
185 target.receivedERR(message);
186 }
187
188 public void receivedRPY(Message message) {
189 decrementOutstandingReplyCount();
190 target.receivedRPY(message);
191 }
192 }
193
194 private class ReplyWrapper implements Reply {
195
196 private final Reply target;
197
198 private ReplyWrapper(Reply target) {
199 this.target = target;
200 incrementOutstandingResponseCount();
201 }
202
203 public MessageBuilder createMessageBuilder() {
204 return target.createMessageBuilder();
205 }
206
207 public void sendANS(Message message) {
208 target.sendANS(message);
209 }
210
211 public void sendNUL() {
212 decrementOutstandingResponseCount();
213 target.sendNUL();
214 }
215
216 public void sendERR(Message message) {
217 decrementOutstandingResponseCount();
218 target.sendERR(message);
219 }
220
221 public void sendRPY(Message message) {
222 decrementOutstandingResponseCount();
223 target.sendRPY(message);
224 }
225 }
226
227 private static interface State {
228
229 void checkCondition();
230
231 void sendMessage(Message message, ReplyHandler listener);
232
233 void closeInitiated(CloseChannelCallback callback);
234
235 void closeRequested(CloseChannelRequest request);
236
237 void messageReceived(Message message, Reply handler);
238
239 }
240
241 private static abstract class AbstractState implements State {
242
243 public void checkCondition() {
244
245 }
246
247 public void sendMessage(Message message, ReplyHandler listener) {
248 throw new IllegalStateException();
249 }
250
251 public void closeInitiated(CloseChannelCallback callback) {
252 throw new IllegalStateException();
253 }
254
255 public void closeRequested(CloseChannelRequest request) {
256 throw new IllegalStateException();
257 }
258
259 public void messageReceived(Message message, Reply handler) {
260 throw new IllegalStateException();
261 }
262 }
263
264 private class Alive extends AbstractState {
265
266 @Override
267 public void sendMessage(final Message message, final ReplyHandler listener) {
268 session.sendMessage(channelNumber, message, listener);
269 }
270
271 @Override
272 public void messageReceived(Message message, Reply handler) {
273 channelHandler.messageReceived(message, handler);
274 }
275
276 @Override
277 public void closeInitiated(CloseChannelCallback callback) {
278 setState(new CloseInitiated(callback));
279 }
280
281 @Override
282 public void closeRequested(CloseChannelRequest request) {
283 setState(new CloseRequested(request));
284 }
285
286 }
287
288 private class CloseInitiated extends AbstractState {
289
290 private final CloseChannelCallback callback;
291
292 private CloseInitiated(CloseChannelCallback callback) {
293 this.callback = callback;
294 }
295
296 @Override
297 public void messageReceived(Message message, Reply handler) {
298 channelHandler.messageReceived(message, handler);
299 }
300
301 @Override
302 public void checkCondition() {
303 if (isReadyToShutdown()) {
304 session.requestChannelClose(channelNumber, new CloseChannelCallback() {
305 public void closeDeclined(int code, String message) {
306 callback.closeDeclined(code, message);
307 setState(new Alive());
308 }
309 public void closeAccepted() {
310 channelClosed();
311 callback.closeAccepted();
312 setState(new Dead());
313 }
314 });
315 }
316 }
317
318
319
320
321
322
323
324
325 @Override
326 public void closeRequested(CloseChannelRequest request) {
327 callback.closeAccepted();
328 channelClosed();
329 request.accept();
330 setState(new Dead());
331 }
332
333 }
334
335 private class CloseRequested extends AbstractState {
336
337 private final CloseChannelRequest request;
338
339 private CloseRequested(CloseChannelRequest request) {
340 this.request = request;
341 }
342
343 @Override
344 public void messageReceived(Message message, Reply handler) {
345 channelHandler.messageReceived(message, handler);
346 }
347
348 @Override
349 public void checkCondition() {
350 if (isReadyToShutdown()) {
351 channelHandler.channelCloseRequested(new CloseChannelRequest() {
352 public void reject() {
353 setState(new Alive());
354 request.reject();
355 }
356 public void accept() {
357 setState(new Dead());
358 request.accept();
359
360 channelHandler.channelClosed();
361 }
362 });
363 }
364 }
365 }
366
367 private class Dead extends AbstractState {
368
369 }
370
371 }