View Javadoc

1   /*
2    *  Copyright 2006 Simon Raess
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.beep4j.internal.tcp;
17  
18  import java.nio.ByteBuffer;
19  import java.nio.charset.Charset;
20  import java.util.LinkedList;
21  
22  import net.sf.beep4j.Message;
23  import net.sf.beep4j.ProtocolException;
24  import net.sf.beep4j.internal.DataHeader;
25  import net.sf.beep4j.internal.Frame;
26  import net.sf.beep4j.internal.MessageType;
27  import net.sf.beep4j.internal.DataHeader.ANSHeader;
28  import net.sf.beep4j.internal.util.Assert;
29  import net.sf.beep4j.transport.Transport;
30  
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  public class DefaultChannelController implements ChannelController {
35  	
36  	private static final Logger LOG = LoggerFactory.getLogger(ChannelController.class);
37  	
38  	private static final Charset ASCII_CHARSET = Charset.forName("US-ASCII");
39  
40  	public static final int MINIMUM_FRAME_SIZE = 1;
41  	
42  	private final int channel;
43  	
44  	private final SlidingWindow window;
45  	
46  	private final SlidingWindow senderWindow;
47  	
48  	private final LinkedList<Frame> frames = new LinkedList<Frame>();
49  	
50  	private final Transport transport;
51  	
52  	private long seqno;
53  	
54  	public DefaultChannelController(Transport transport, int channel, int window) {
55  		Assert.notNull("transport", transport);
56  		this.transport = transport;
57  		this.channel = channel;
58  		this.senderWindow = new SlidingWindow(window);
59  		this.window = new SlidingWindow(window);
60  	}
61  	
62  	public void updateSendWindow(long ackno, int size) {
63  		LOG.debug("update send window: ackno=" + ackno + ",window=" + size);
64  		senderWindow.slide(ackno, size);
65  		sendFrames(transport);
66  	}
67  	
68  	public void sendANS(int messageNumber, int answerNumber, Message message) {
69  		LOG.debug("sendANS to message " + messageNumber + " with answer number "
70  				+ answerNumber + " on channel " + channel);
71  		ByteBuffer buffer = message.asByteBuffer();
72  		DataHeader header = new ANSHeader(
73  				channel, messageNumber, false, 
74  				seqno, 
75  				buffer.remaining(), answerNumber);
76  
77  		seqno += buffer.remaining();
78  		
79  		Frame frame = new Frame(header, buffer);
80  		enqueueFrame(frame);
81  		sendFrames(transport);
82  	}
83  	
84  	public void sendERR(int messageNumber, Message message) {
85  		LOG.debug("sendERR to message " + messageNumber + " on channel " + channel);
86  		ByteBuffer buffer = message.asByteBuffer();
87  		DataHeader header = new DataHeader(
88  				MessageType.ERR,
89  				channel, messageNumber, false, 
90  				seqno, buffer.remaining());
91  		
92  		seqno += buffer.remaining();
93  		
94  		Frame frame = new Frame(header, buffer);
95  		enqueueFrame(frame);
96  		sendFrames(transport);
97  	}
98  	
99  	public void sendMSG(int messageNumber, Message message) {
100 		LOG.debug("sendMSG with message number " + messageNumber + " on channel " + channel);
101 		ByteBuffer buffer = message.asByteBuffer();
102 		DataHeader header = new DataHeader(
103 				MessageType.MSG,
104 				channel, messageNumber, false, 
105 				seqno, buffer.remaining());
106 		
107 		seqno += buffer.remaining();
108 		
109 		Frame frame = new Frame(header, buffer);
110 		enqueueFrame(frame);
111 		sendFrames(transport);
112 	}
113 	
114 	public void sendNUL(int messageNumber) {
115 		LOG.debug("sendNUL to message " + messageNumber + " on channel " + channel);
116 		DataHeader header = new DataHeader(
117 				MessageType.NUL,
118 				channel, messageNumber, false, 
119 				seqno, 0);
120 		
121 		Frame frame = new Frame(header, ByteBuffer.allocate(0));
122 		enqueueFrame(frame);
123 		sendFrames(transport);
124 	}
125 	
126 	public void sendRPY(int messageNumber, Message message) {
127 		LOG.debug("sendRPY to message " + messageNumber + " on channel " + channel);
128 		ByteBuffer buffer = message.asByteBuffer();
129 		DataHeader header = new DataHeader(
130 				MessageType.RPY,
131 				channel, messageNumber, false, 
132 				seqno, buffer.remaining());
133 		
134 		seqno += buffer.remaining();
135 		
136 		Frame frame = new Frame(header, buffer);
137 		enqueueFrame(frame);
138 		int count = sendFrames(transport);
139 		LOG.debug("sendRPY caused " + count + " frames to be sent");
140 	}
141 	
142 	long id;
143 	
144 	public synchronized void checkFrame(long seqno, int payloadSize) {
145 		if (seqno != window.getPosition()) {
146 			throw new ProtocolException("sequence number " + seqno + " does not "
147 					+ "match expected sequence number " + window.getPosition());
148 		}
149 		if (window.remaining() < payloadSize) {
150 			throw new ProtocolException("message larger than remaining window size (remaining="
151 					+ window.remaining() + ",payload size=" + payloadSize + ")");
152 		}
153 	}
154 	
155 	public void frameReceived(long seqno, int size) {
156 		if (seqno != window.getPosition()) {
157 			throw new IllegalStateException("sequence number " + seqno + " does not "
158 					+ "match expected sequence number " + window.getPosition());
159 		}
160 		
161 		LOG.debug("frameReceived on channel " + channel + ": seqno=" + seqno + ",size=" + size);
162 		window.moveBy(size);
163 		LOG.debug("receiver window = " + window);
164 		
165 		if (window.remaining() <= 0.5 * window.getWindowSize()) {
166 			long ackno = seqno + size;
167 			int windowSize = window.getWindowSize();
168 			window.slide(ackno, windowSize);
169 			LOG.debug("sending SEQ frame on channel " + channel + ": ackno=" + ackno + ",window=" + windowSize);
170 			LOG.debug("receiver window = " + window);
171 			transport.sendBytes(createSEQFrame(channel, ackno, windowSize));
172 		}
173 	}
174 	
175 	private ByteBuffer createSEQFrame(int channel, long ackno, int window) {
176 		StringBuilder buf = new StringBuilder(SEQHeader.TYPE);
177 		buf.append(" ");
178 		buf.append(channel);
179 		buf.append(" ");
180 		buf.append(ackno);
181 		buf.append(" ");
182 		buf.append(window);
183 		buf.append("\r\n");
184 		return ASCII_CHARSET.encode(buf.toString());
185 	}
186 
187 	private void enqueueFrame(Frame frame) {
188 		frames.addLast(frame);
189 	}
190 	
191 	protected int sendFrames(Transport transport) {
192 		int count = 0;
193 		Frame frame;
194 		
195 		while ((frame = nextFrame()) != null) {
196 			LOG.debug("send frame " + frame.getHeader());
197 			senderWindow.moveBy(frame.getSize());
198 			frame.send(transport);
199 			LOG.debug("sender window = " + senderWindow);
200 			count++;
201 		}
202 		
203 		return count;
204 	}
205 	
206 	private Frame nextFrame() {
207 		if (frames.isEmpty()) {
208 			return null;
209 		} else {
210 			Frame frame = frames.removeFirst();
211 			
212 			if (frame.getSize() <= senderWindow.remaining()) {
213 				LOG.debug("sending frame unchanged (channel=" + channel + ")");
214 				if (frames.isEmpty()) {
215 					LOG.debug("sending last frame in buffer (channel=" + channel + ")");
216 				}
217 				return frame;
218 			} else if (senderWindow.remaining() >= MINIMUM_FRAME_SIZE) {
219 				LOG.debug("split frame at position " + senderWindow.remaining()
220 						+ " (channel=" + channel + ")");
221 				Frame[] split = frame.split(senderWindow.remaining());
222 				frames.addFirst(split[1]);
223 				return split[0];
224 			} else {
225 				frames.addFirst(frame);
226 				return null;
227 			}
228 		}
229 	}
230 	
231 }