1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal;
17
18 import java.nio.ByteBuffer;
19 import java.util.HashMap;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23
24 import net.sf.beep4j.Message;
25 import net.sf.beep4j.ProtocolException;
26 import net.sf.beep4j.internal.DataHeader.ANSHeader;
27 import net.sf.beep4j.internal.message.DefaultMessageParser;
28 import net.sf.beep4j.internal.message.MessageParser;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33
34
35
36
37
38
39 public class MessageAssembler implements FrameHandler {
40
41 private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
42
43 private final MessageHandler handler;
44
45 private State state;
46
47 public MessageAssembler(MessageHandler handler) {
48 this.handler = handler;
49 }
50
51
52
53
54 public void handleFrame(Frame frame) {
55 LOG.debug("got frame: " + frame.getHeader());
56
57 if (state == null) {
58 MessageType type = frame.getHeader().getType();
59 if (MessageType.ANS == type || MessageType.NUL == type) {
60 LOG.debug("moving to ANS state");
61 state = new AnsState();
62 } else {
63 LOG.debug("moving to normal state");
64 state = new NormalState();
65 }
66 }
67
68
69 state.append(frame, handler);
70 }
71
72
73
74
75 protected Message createMessage(List<Frame> frames) {
76 if (frames.size() == 0) {
77 throw new IllegalArgumentException("cannot create message from 0 fragments");
78 }
79
80 LOG.debug("creating message from " + frames.size() + " frames");
81
82 int total = 0;
83 for (Frame frame : frames) {
84 long check = total + frame.getSize();
85 if (check > Integer.MAX_VALUE) {
86 throw new ProtocolException("total message length is longer "
87 + "than supported: " + check);
88 }
89 total += frame.getPayload().remaining();
90 }
91
92 LOG.debug("total payload size is " + total);
93
94 ByteBuffer buffer = ByteBuffer.allocate(total);
95 for (Frame frame : frames) {
96 buffer.put(frame.getPayload());
97 }
98 buffer.flip();
99
100 MessageParser parser = new DefaultMessageParser();
101 return parser.parse(buffer);
102 }
103
104 protected void receive(MessageType type, int channelNumber, int messageNumber, Message message) {
105 if (MessageType.ERR == type) {
106 handler.receiveERR(channelNumber, messageNumber, message);
107 } else if (MessageType.MSG == type) {
108 handler.receiveMSG(channelNumber, messageNumber, message);
109 } else if (MessageType.RPY == type) {
110 handler.receiveRPY(channelNumber, messageNumber, message);
111 } else {
112 throw new IllegalArgumentException("unkown type: " + type);
113 }
114 }
115
116 protected void receive(int channelNumber, int messageNumber, int answerNumber, Message message) {
117 handler.receiveANS(channelNumber, messageNumber, answerNumber, message);
118 }
119
120 private static interface State {
121 void append(Frame frame, MessageHandler handler);
122 }
123
124 private class NormalState implements State {
125 private List<Frame> fragments;
126 private DataHeader last;
127
128 private NormalState() {
129 this.fragments = new LinkedList<Frame>();
130 }
131
132 private boolean hasPreviousFrame() {
133 return last != null;
134 }
135
136 public void append(Frame frame, MessageHandler handler) {
137 DataHeader header = (DataHeader) frame.getHeader();
138 MessageType type = header.getType();
139
140 if (hasPreviousFrame()) {
141 validateMessageNumber(header);
142 validateMatchingFragmentTypes(last.getType(), type);
143 }
144
145 fragments.add(frame);
146
147 if (header.isIntermediate()) {
148 last = (DataHeader) frame.getHeader();
149 } else {
150 LOG.debug("got complete message with " + fragments.size() + " fragments");
151 last = null;
152 List<Frame> copy = new LinkedList<Frame>(fragments);
153 fragments.clear();
154 state = null;
155 receive(type, frame.getChannelNumber(), frame.getMessageNumber(), createMessage(copy));
156 }
157 }
158
159
160
161
162
163
164
165
166
167
168 private void validateMessageNumber(DataHeader header) {
169 if (last.getMessageNumber() != header.getMessageNumber()) {
170 throw new ProtocolException("message number for fragments does not match: was "
171 + header.getMessageNumber() + ", should be "
172 + last.getMessageNumber());
173 }
174 }
175
176
177
178
179
180
181
182
183
184
185
186 private void validateMatchingFragmentTypes(MessageType last, MessageType current) {
187 if (MessageType.ERR == current
188 || MessageType.MSG == current
189 || MessageType.RPY == current) {
190 if (!last.equals(current)) {
191 throw new ProtocolException("header type does not match: expected "
192 + last + " but was " + current);
193 }
194 }
195 }
196 }
197
198 private class AnsState implements State {
199 private Map<Integer, List<Frame>> fragments;
200 private int messageNumber = -1;
201
202 private AnsState() {
203 this.fragments = new HashMap<Integer, List<Frame>>();
204 }
205
206 public void append(Frame frame, MessageHandler handler) {
207 MessageType type = frame.getType();
208
209 if (messageNumber == -1) {
210 messageNumber = frame.getMessageNumber();
211 } else {
212 validateMessageNumber(frame.getHeader());
213 }
214
215 if (MessageType.ANS == type) {
216 ANSHeader header = (ANSHeader) frame.getHeader();
217 List<Frame> frames = fragments.get(header.getAnswerNumber());
218 if (frames == null) {
219 frames = new LinkedList<Frame>();
220 fragments.put(header.getAnswerNumber(), frames);
221 }
222 frames.add(frame);
223 if (!header.isIntermediate()) {
224 fragments.remove(header.getAnswerNumber());
225 receive(frame.getChannelNumber(),
226 frame.getMessageNumber(),
227 header.getAnswerNumber(),
228 createMessage(frames));
229 }
230
231 } else if (MessageType.NUL == type) {
232 if (hasUnfinishedAnsMessages()) {
233
234
235
236
237
238
239
240
241
242 throw new ProtocolException("unfinished ANS messages");
243 } else if (frame.isIntermediate()) {
244 throw new ProtocolException("NUL reply's continuation indicator is '*'");
245 } else if (frame.getSize() != 0) {
246 throw new ProtocolException("NUL reply's payload size is non-zero ("
247 + frame.getSize() + ")");
248 }
249
250 fragments.clear();
251 state = null;
252 handler.receiveNUL(frame.getChannelNumber(), frame.getMessageNumber());
253
254 } else {
255 throw new ProtocolException("expected ANS or NUL message, was " + type.name());
256 }
257 }
258
259
260
261
262
263
264
265
266
267
268 private void validateMessageNumber(DataHeader current) {
269 if (messageNumber != current.getMessageNumber()) {
270 throw new ProtocolException("message number for fragments does not match: was "
271 + current.getMessageNumber() + ", should be "
272 + messageNumber);
273 }
274 }
275
276 private boolean hasUnfinishedAnsMessages() {
277 return fragments.size() > 0;
278 }
279
280 }
281
282 }