1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal.tcp;
17
18 import java.nio.ByteBuffer;
19 import java.nio.charset.Charset;
20 import java.util.LinkedList;
21
22 import net.sf.beep4j.Message;
23 import net.sf.beep4j.ProtocolException;
24 import net.sf.beep4j.internal.DataHeader;
25 import net.sf.beep4j.internal.Frame;
26 import net.sf.beep4j.internal.MessageType;
27 import net.sf.beep4j.internal.DataHeader.ANSHeader;
28 import net.sf.beep4j.internal.util.Assert;
29 import net.sf.beep4j.transport.Transport;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 public class DefaultChannelController implements ChannelController {
35
36 private static final Logger LOG = LoggerFactory.getLogger(ChannelController.class);
37
38 private static final Charset ASCII_CHARSET = Charset.forName("US-ASCII");
39
40 public static final int MINIMUM_FRAME_SIZE = 1;
41
42 private final int channel;
43
44 private final SlidingWindow window;
45
46 private final SlidingWindow senderWindow;
47
48 private final LinkedList<Frame> frames = new LinkedList<Frame>();
49
50 private final Transport transport;
51
52 private long seqno;
53
54 public DefaultChannelController(Transport transport, int channel, int window) {
55 Assert.notNull("transport", transport);
56 this.transport = transport;
57 this.channel = channel;
58 this.senderWindow = new SlidingWindow(window);
59 this.window = new SlidingWindow(window);
60 }
61
62 public void updateSendWindow(long ackno, int size) {
63 LOG.debug("update send window: ackno=" + ackno + ",window=" + size);
64 senderWindow.slide(ackno, size);
65 sendFrames(transport);
66 }
67
68 public void sendANS(int messageNumber, int answerNumber, Message message) {
69 LOG.debug("sendANS to message " + messageNumber + " with answer number "
70 + answerNumber + " on channel " + channel);
71 ByteBuffer buffer = message.asByteBuffer();
72 DataHeader header = new ANSHeader(
73 channel, messageNumber, false,
74 seqno,
75 buffer.remaining(), answerNumber);
76
77 seqno += buffer.remaining();
78
79 Frame frame = new Frame(header, buffer);
80 enqueueFrame(frame);
81 sendFrames(transport);
82 }
83
84 public void sendERR(int messageNumber, Message message) {
85 LOG.debug("sendERR to message " + messageNumber + " on channel " + channel);
86 ByteBuffer buffer = message.asByteBuffer();
87 DataHeader header = new DataHeader(
88 MessageType.ERR,
89 channel, messageNumber, false,
90 seqno, buffer.remaining());
91
92 seqno += buffer.remaining();
93
94 Frame frame = new Frame(header, buffer);
95 enqueueFrame(frame);
96 sendFrames(transport);
97 }
98
99 public void sendMSG(int messageNumber, Message message) {
100 LOG.debug("sendMSG with message number " + messageNumber + " on channel " + channel);
101 ByteBuffer buffer = message.asByteBuffer();
102 DataHeader header = new DataHeader(
103 MessageType.MSG,
104 channel, messageNumber, false,
105 seqno, buffer.remaining());
106
107 seqno += buffer.remaining();
108
109 Frame frame = new Frame(header, buffer);
110 enqueueFrame(frame);
111 sendFrames(transport);
112 }
113
114 public void sendNUL(int messageNumber) {
115 LOG.debug("sendNUL to message " + messageNumber + " on channel " + channel);
116 DataHeader header = new DataHeader(
117 MessageType.NUL,
118 channel, messageNumber, false,
119 seqno, 0);
120
121 Frame frame = new Frame(header, ByteBuffer.allocate(0));
122 enqueueFrame(frame);
123 sendFrames(transport);
124 }
125
126 public void sendRPY(int messageNumber, Message message) {
127 LOG.debug("sendRPY to message " + messageNumber + " on channel " + channel);
128 ByteBuffer buffer = message.asByteBuffer();
129 DataHeader header = new DataHeader(
130 MessageType.RPY,
131 channel, messageNumber, false,
132 seqno, buffer.remaining());
133
134 seqno += buffer.remaining();
135
136 Frame frame = new Frame(header, buffer);
137 enqueueFrame(frame);
138 int count = sendFrames(transport);
139 LOG.debug("sendRPY caused " + count + " frames to be sent");
140 }
141
142 long id;
143
144 public synchronized void checkFrame(long seqno, int payloadSize) {
145 if (seqno != window.getPosition()) {
146 throw new ProtocolException("sequence number " + seqno + " does not "
147 + "match expected sequence number " + window.getPosition());
148 }
149 if (window.remaining() < payloadSize) {
150 throw new ProtocolException("message larger than remaining window size (remaining="
151 + window.remaining() + ",payload size=" + payloadSize + ")");
152 }
153 }
154
155 public void frameReceived(long seqno, int size) {
156 if (seqno != window.getPosition()) {
157 throw new IllegalStateException("sequence number " + seqno + " does not "
158 + "match expected sequence number " + window.getPosition());
159 }
160
161 LOG.debug("frameReceived on channel " + channel + ": seqno=" + seqno + ",size=" + size);
162 window.moveBy(size);
163 LOG.debug("receiver window = " + window);
164
165 if (window.remaining() <= 0.5 * window.getWindowSize()) {
166 long ackno = seqno + size;
167 int windowSize = window.getWindowSize();
168 window.slide(ackno, windowSize);
169 LOG.debug("sending SEQ frame on channel " + channel + ": ackno=" + ackno + ",window=" + windowSize);
170 LOG.debug("receiver window = " + window);
171 transport.sendBytes(createSEQFrame(channel, ackno, windowSize));
172 }
173 }
174
175 private ByteBuffer createSEQFrame(int channel, long ackno, int window) {
176 StringBuilder buf = new StringBuilder(SEQHeader.TYPE);
177 buf.append(" ");
178 buf.append(channel);
179 buf.append(" ");
180 buf.append(ackno);
181 buf.append(" ");
182 buf.append(window);
183 buf.append("\r\n");
184 return ASCII_CHARSET.encode(buf.toString());
185 }
186
187 private void enqueueFrame(Frame frame) {
188 frames.addLast(frame);
189 }
190
191 protected int sendFrames(Transport transport) {
192 int count = 0;
193 Frame frame;
194
195 while ((frame = nextFrame()) != null) {
196 LOG.debug("send frame " + frame.getHeader());
197 senderWindow.moveBy(frame.getSize());
198 frame.send(transport);
199 LOG.debug("sender window = " + senderWindow);
200 count++;
201 }
202
203 return count;
204 }
205
206 private Frame nextFrame() {
207 if (frames.isEmpty()) {
208 return null;
209 } else {
210 Frame frame = frames.removeFirst();
211
212 if (frame.getSize() <= senderWindow.remaining()) {
213 LOG.debug("sending frame unchanged (channel=" + channel + ")");
214 if (frames.isEmpty()) {
215 LOG.debug("sending last frame in buffer (channel=" + channel + ")");
216 }
217 return frame;
218 } else if (senderWindow.remaining() >= MINIMUM_FRAME_SIZE) {
219 LOG.debug("split frame at position " + senderWindow.remaining()
220 + " (channel=" + channel + ")");
221 Frame[] split = frame.split(senderWindow.remaining());
222 frames.addFirst(split[1]);
223 return split[0];
224 } else {
225 frames.addFirst(frame);
226 return null;
227 }
228 }
229 }
230
231 }