View Javadoc

1   /*
2    *  Copyright 2006 Simon Raess
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.beep4j.internal;
17  
18  import java.nio.ByteBuffer;
19  import java.util.HashMap;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.Map;
23  
24  import net.sf.beep4j.Message;
25  import net.sf.beep4j.ProtocolException;
26  import net.sf.beep4j.internal.DataHeader.ANSHeader;
27  import net.sf.beep4j.internal.message.DefaultMessageParser;
28  import net.sf.beep4j.internal.message.MessageParser;
29  
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  /**
34   * MessageAssembler assembles fragmented frames into a Message.
35   * The assembled Messages are passed to a MessageHandler.
36   * 
37   * @author Simon Raess
38   */
39  public class MessageAssembler implements FrameHandler {
40  	
41  	private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
42  	
43  	private final MessageHandler handler;
44  	
45  	private State state;
46  
47  	public MessageAssembler(MessageHandler handler) {
48  		this.handler = handler;
49  	}
50  
51  	
52  	// --> start of FrameHandler methods <--
53  	
54  	public void handleFrame(Frame frame) {
55  		LOG.debug("got frame: " + frame.getHeader());
56  		
57  		if (state == null) {
58  			MessageType type = frame.getHeader().getType();
59  			if (MessageType.ANS == type || MessageType.NUL == type) {
60  				LOG.debug("moving to ANS state");
61  				state = new AnsState();
62  			} else {
63  				LOG.debug("moving to normal state");
64  				state = new NormalState();
65  			}
66  		}
67  		
68  		// pass on to the state
69  		state.append(frame, handler);
70  	}
71  	
72  	// --> end of FrameHandler methods <--
73  	
74  	
75  	protected Message createMessage(List<Frame> frames) {
76  		if (frames.size() == 0) {
77  			throw new IllegalArgumentException("cannot create message from 0 fragments");
78  		}
79  		
80  		LOG.debug("creating message from " + frames.size() + " frames");
81  		
82  		int total = 0;
83  		for (Frame frame : frames) {
84  			long check = total + frame.getSize();
85  			if (check > Integer.MAX_VALUE) {
86  				throw new ProtocolException("total message length is longer "
87  						+ "than supported: " + check);
88  			}
89  			total += frame.getPayload().remaining();
90  		}
91  		
92  		LOG.debug("total payload size is " + total);
93  		
94  		ByteBuffer buffer = ByteBuffer.allocate(total);
95  		for (Frame frame : frames) {
96  			buffer.put(frame.getPayload());
97  		}
98  		buffer.flip();
99  		
100 		MessageParser parser = new DefaultMessageParser();
101 		return parser.parse(buffer);
102 	}
103 
104 	protected void receive(MessageType type, int channelNumber, int messageNumber, Message message) {
105 		if (MessageType.ERR == type) {
106 			handler.receiveERR(channelNumber, messageNumber, message);
107 		} else if (MessageType.MSG == type) {
108 			handler.receiveMSG(channelNumber, messageNumber, message);
109 		} else if (MessageType.RPY == type) {
110 			handler.receiveRPY(channelNumber, messageNumber, message);
111 		} else {
112 			throw new IllegalArgumentException("unkown type: " + type);
113 		}
114 	}
115 	
116 	protected void receive(int channelNumber, int messageNumber, int answerNumber, Message message) {
117 		handler.receiveANS(channelNumber, messageNumber, answerNumber, message);
118 	}
119 	
120 	private static interface State {
121 		void append(Frame frame, MessageHandler handler);
122 	}
123 	
124 	private class NormalState implements State {
125 		private List<Frame> fragments;
126 		private DataHeader last;
127 		
128 		private NormalState() { 
129 			this.fragments = new LinkedList<Frame>();
130 		}
131 		
132 		private boolean hasPreviousFrame() {
133 			return last != null;
134 		}
135 		
136 		public void append(Frame frame, MessageHandler handler) {
137 			DataHeader header = (DataHeader) frame.getHeader();
138 			MessageType type = header.getType();
139 			
140 			if (hasPreviousFrame()) {
141 				validateMessageNumber(header);
142 				validateMatchingFragmentTypes(last.getType(), type);
143 			}
144 			
145 			fragments.add(frame);
146 			
147 			if (header.isIntermediate()) {
148 				last = (DataHeader) frame.getHeader();
149 			} else {
150 				LOG.debug("got complete message with " + fragments.size() + " fragments");
151 				last = null;
152 				List<Frame> copy = new LinkedList<Frame>(fragments);
153 				fragments.clear();
154 				state = null;
155 				receive(type, frame.getChannelNumber(), frame.getMessageNumber(), createMessage(copy));
156 			}
157 		}
158 
159 		/*
160 		 * Validation of sequencing according to the BEEP specification section
161 		 * 2.2.1.1.
162 		 * 
163 		 * A frame is poorly formed, if the continuation indicator of the 
164 		 * previous frame received on the same channel 
165 		 * was intermediate ("*"), and its message number isn't identical to this frame's 
166 		 * message number.
167 		 */
168 		private void validateMessageNumber(DataHeader header) {
169 			if (last.getMessageNumber() != header.getMessageNumber()) {
170 				throw new ProtocolException("message number for fragments does not match: was "
171 						+ header.getMessageNumber() + ", should be " 
172 						+ last.getMessageNumber());
173 			}
174 		}
175 		
176 		/*
177 		 * Validation of sequencing according to the BEEP specification section
178 		 * 2.2.1.1.
179 		 * 
180 		 * A frame is poorly formed if the header starts with "MSG", "RPY", "ERR", 
181 		 * or "ANS", and refers to a message number for which at least one other 
182 		 * frame has been received, and the three-character keyword starting this 
183 		 * frame and the immediately-previous received frame for this message 
184 		 * number are not identical
185 		 */
186 		private void validateMatchingFragmentTypes(MessageType last, MessageType current) {
187 			if (MessageType.ERR == current
188 					|| MessageType.MSG == current
189 					|| MessageType.RPY == current) {
190 				if (!last.equals(current)) {
191 					throw new ProtocolException("header type does not match: expected "
192 							+ last + " but was " + current);
193 				}
194 			}
195 		}
196 	}
197 	
198 	private class AnsState implements State {
199 		private Map<Integer, List<Frame>> fragments;
200 		private int messageNumber = -1;
201 		
202 		private AnsState() {
203 			this.fragments = new HashMap<Integer, List<Frame>>();
204 		}
205 		
206 		public void append(Frame frame, MessageHandler handler) {
207 			MessageType type = frame.getType();
208 			
209 			if (messageNumber == -1) {
210 				messageNumber = frame.getMessageNumber();
211 			} else {
212 				validateMessageNumber(frame.getHeader());
213 			}
214 			
215 			if (MessageType.ANS == type) {
216 				ANSHeader header = (ANSHeader) frame.getHeader();
217 				List<Frame> frames = fragments.get(header.getAnswerNumber());
218 				if (frames == null) {
219 					frames = new LinkedList<Frame>();
220 					fragments.put(header.getAnswerNumber(), frames);
221 				}
222 				frames.add(frame);
223 				if (!header.isIntermediate()) {
224 					fragments.remove(header.getAnswerNumber());
225 					receive(frame.getChannelNumber(), 
226 							frame.getMessageNumber(), 
227 							header.getAnswerNumber(),
228 							createMessage(frames));				
229 				}
230 				
231 			} else if (MessageType.NUL == type) {
232 				if (hasUnfinishedAnsMessages()) {
233 					// Validation of sequencing according to the BEEP specification section
234 					// 2.2.1.1.
235 					//  
236 					// A frame is poorly formed if the header starts with "NUL", and refers to 
237 					// a message number for which at least one other frame has been received, 
238 					// and the keyword of of the immediately-previous received frame for 
239 					// this reply isn't "ANS".
240 					
241 					// TODO: use proper exceptions
242 					throw new ProtocolException("unfinished ANS messages");
243 				} else if (frame.isIntermediate()) {
244 					throw new ProtocolException("NUL reply's continuation indicator is '*'");
245 				} else if (frame.getSize() != 0) {
246 					throw new ProtocolException("NUL reply's payload size is non-zero ("
247 							+ frame.getSize() + ")");
248 				}
249 				
250 				fragments.clear();
251 				state = null;
252 				handler.receiveNUL(frame.getChannelNumber(), frame.getMessageNumber());
253 				
254 			} else {
255 				throw new ProtocolException("expected ANS or NUL message, was " + type.name());
256 			}			
257 		}
258 
259 		/*
260 		 * Validation of sequencing according to the BEEP specification section
261 		 * 2.2.1.1.
262 		 * 
263 		 * A frame is poorly formed, if the continuation indicator of the 
264 		 * previous frame received on the same channel 
265 		 * was intermediate ("*"), and its message number isn't identical to this frame's 
266 		 * message number.
267 		 */
268 		private void validateMessageNumber(DataHeader current) {
269 			if (messageNumber != current.getMessageNumber()) {
270 				throw new ProtocolException("message number for fragments does not match: was "
271 						+ current.getMessageNumber() + ", should be " 
272 						+ messageNumber);
273 			}
274 		}
275 				
276 		private boolean hasUnfinishedAnsMessages() {
277 			return fragments.size() > 0;
278 		}
279 		
280 	}
281 	
282 }