1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal.profile;
17
18 import java.net.SocketAddress;
19
20 import net.sf.beep4j.Channel;
21 import net.sf.beep4j.ChannelHandler;
22 import net.sf.beep4j.CloseChannelCallback;
23 import net.sf.beep4j.CloseChannelRequest;
24 import net.sf.beep4j.Message;
25 import net.sf.beep4j.MessageBuilder;
26 import net.sf.beep4j.ProfileInfo;
27 import net.sf.beep4j.ProtocolException;
28 import net.sf.beep4j.ReplyHandler;
29 import net.sf.beep4j.Reply;
30 import net.sf.beep4j.SessionHandler;
31 import net.sf.beep4j.internal.CloseCallback;
32 import net.sf.beep4j.internal.DefaultStartSessionRequest;
33 import net.sf.beep4j.internal.SessionManager;
34 import net.sf.beep4j.internal.StartChannelResponse;
35
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39
40
41
42
43
44 public class ChannelManagementProfileImpl implements ChannelHandler, ChannelManagementProfile {
45
46 private static final Logger LOG = LoggerFactory.getLogger(ChannelManagementProfile.class);
47
48 private SessionManager manager;
49
50 private Channel channel;
51
52 private final boolean initiator;
53
54 private final ChannelManagementMessageBuilder builder;
55
56 private final ChannelManagementMessageParser parser;
57
58 public ChannelManagementProfileImpl(boolean initiator) {
59 this.initiator = initiator;
60 this.builder = createChannelManagementMessageBuilder();
61 this.parser = createChannelManagementMessageParser();
62 }
63
64 protected ChannelManagementMessageBuilder createChannelManagementMessageBuilder() {
65 return new SaxMessageBuilder();
66 }
67
68 protected ChannelManagementMessageParser createChannelManagementMessageParser() {
69 return new SaxMessageParser();
70 }
71
72 protected MessageBuilder createMessageBuilder() {
73 MessageBuilder builder = channel.createMessageBuilder();
74 builder.setContentType("application", "beep+xml");
75 builder.setCharsetName("UTF-8");
76 return builder;
77 }
78
79
80
81
82
83
84
85
86
87
88
89
90
91 public void channelStartFailed(int code, String message) {
92 throw new UnsupportedOperationException();
93 }
94
95 public void channelOpened(Channel c) {
96 this.channel = c;
97 }
98
99 public void messageReceived(Message message, final Reply handler) {
100 ChannelManagementRequest r = parser.parseRequest(message);
101 LOG.debug("received request " + r);
102
103 if (r instanceof StartChannelMessage) {
104 StartChannelMessage request = (StartChannelMessage) r;
105 int channelNumber = request.getChannelNumber();
106
107
108 if (initiator && channelNumber % 2 != 0) {
109 LOG.warn("received invalid start channel request: number attribute in <start> element must be "
110 + "odd valued (was=" + channelNumber + ")");
111 handler.sendERR(builder.createError(
112 createMessageBuilder(), 501, "number attribute in <start> element must be odd valued"));
113 } else if (!initiator && channelNumber % 2 != 1) {
114 LOG.warn("received invalid start channel request: number attribute in <start> element must be "
115 + "even valued (was=" + channelNumber + ")");
116 handler.sendERR(builder.createError(
117 createMessageBuilder(), 501, "number attribute in <start> element must be even valued"));
118 } else {
119 handleStartChannelRequest(request, handler);
120 }
121
122 } else if (r instanceof CloseChannelMessage) {
123 final CloseChannelMessage request = (CloseChannelMessage) r;
124
125 if (request.getChannelNumber() == 0) {
126 LOG.info("session close requested");
127 manager.sessionCloseRequested(new CloseCallback() {
128 public void closeDeclined(int code, String message) {
129 LOG.info("close of session declined by framework: "
130 + code + ",'" + message + "'");
131 handler.sendERR(
132 builder.createError(createMessageBuilder(), code, message));
133 }
134
135 public void closeAccepted() {
136 LOG.info("close of session accepted by framework");
137 handler.sendRPY(builder.createOk(createMessageBuilder()));
138 }
139 });
140
141 } else {
142 LOG.info("close of channel " + request.getChannelNumber() + " requested");
143 manager.channelCloseRequested(request.getChannelNumber(), new CloseChannelRequest() {
144 public void reject() {
145 LOG.info("close of channel " + request.getChannelNumber()
146 + " declined by application");
147 handler.sendERR(builder.createError(
148 createMessageBuilder(), 550, "still working"));
149 }
150 public void accept() {
151 LOG.info("close of channel " + request.getChannelNumber()
152 + " accepted by application");
153 handler.sendRPY(builder.createOk(createMessageBuilder()));
154 }
155 });
156 }
157
158 } else {
159 throw new RuntimeException("unexpected code path");
160 }
161 }
162
163 private void handleStartChannelRequest(StartChannelMessage request, Reply handler) {
164 StartChannelResponse response = manager.channelStartRequested(
165 request.getChannelNumber(), request.getProfiles());
166
167 if (response.isCancelled()) {
168 LOG.info("start channel request is cancelled by application: "
169 + response.getCode() + "," + response.getMessage());
170 handler.sendERR(builder.createError(
171 createMessageBuilder(), response.getCode(), response.getMessage()));
172
173 } else {
174 LOG.info("start channel request is accepted by application: "
175 + response.getProfile().getUri());
176 handler.sendRPY(builder.createProfile(
177 createMessageBuilder(), response.getProfile()));
178 }
179 }
180
181 public void channelCloseRequested(CloseChannelRequest request) {
182 throw new UnsupportedOperationException("unexpected code path");
183 }
184
185 public void channelClosed() {
186 this.channel = null;
187 }
188
189
190
191
192
193
194 public ChannelHandler createChannelHandler(SessionManager manager) {
195 this.manager = manager;
196 return this;
197 }
198
199 public boolean connectionEstablished(
200 SocketAddress address,
201 SessionHandler sessionHandler,
202 Reply response) {
203 DefaultStartSessionRequest request = new DefaultStartSessionRequest(!initiator);
204 sessionHandler.connectionEstablished(request);
205
206 if (request.isCancelled()) {
207 response.sendERR(createError(request.getReplyCode(), request.getMessage()));
208 } else {
209 response.sendRPY(createGreeting(request.getProfiles()));
210 }
211
212 return !request.isCancelled();
213 }
214
215 public Message createGreeting(String[] profiles) {
216 return builder.createGreeting(createMessageBuilder(), profiles);
217 }
218
219 public Message createError(int code, String diagnostics) {
220 return builder.createError(createMessageBuilder(), code, diagnostics);
221 }
222
223 public Greeting receivedGreeting(Message message) {
224 return parser.parseGreeting(message);
225 }
226
227 public BEEPError receivedError(Message message) {
228 return parser.parseError(message);
229 }
230
231 public void startChannel(int channelNumber, ProfileInfo[] infos, final StartChannelCallback callback) {
232 Message message = builder.createStart(createMessageBuilder(), channelNumber, infos);
233 channel.sendMessage(message, new ReplyHandler() {
234
235 public void receivedRPY(Message message) {
236 ProfileInfo profile = parser.parseProfile(message);
237 callback.channelCreated(profile);
238 }
239
240 public void receivedERR(Message message) {
241 BEEPError error = parser.parseError(message);
242 callback.channelFailed(error.getCode(), error.getMessage());
243 }
244
245 public void receivedNUL() {
246 throw new ProtocolException("message type NUL is not a valid response");
247 }
248
249 public void receivedANS(Message message) {
250 throw new ProtocolException("message type ANS is not a valid response");
251 }
252 });
253 }
254
255 public void closeChannel(int channelNumber, final CloseChannelCallback callback) {
256 Message message = builder.createClose(createMessageBuilder(), channelNumber, 200);
257 channel.sendMessage(message, new ReplyHandler() {
258
259 public void receivedRPY(Message message) {
260 parser.parseOk(message);
261 callback.closeAccepted();
262 }
263
264 public void receivedERR(Message message) {
265 BEEPError error = parser.parseError(message);
266 callback.closeDeclined(error.getCode(), error.getMessage());
267 }
268
269 public void receivedANS(Message message) {
270 throw new UnsupportedOperationException();
271 }
272
273 public void receivedNUL() {
274 throw new UnsupportedOperationException();
275 }
276
277 });
278 }
279
280 public void closeSession(final CloseCallback callback) {
281 Message message = builder.createClose(createMessageBuilder(), 0, 200);
282 channel.sendMessage(message, new ReplyHandler() {
283
284 public void receivedRPY(Message message) {
285 parser.parseOk(message);
286 callback.closeAccepted();
287 }
288
289 public void receivedERR(Message message) {
290 BEEPError error = parser.parseError(message);
291 callback.closeDeclined(error.getCode(), error.getMessage());
292 }
293
294 public void receivedANS(Message message) {
295 throw new ProtocolException("ANS message not valid response for close request");
296 }
297
298 public void receivedNUL() {
299 throw new ProtocolException("NUL message not valid response for close request");
300 }
301
302 });
303 }
304
305
306
307 }