1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal.tcp;
17
18 import java.util.HashMap;
19 import java.util.Map;
20
21 import net.sf.beep4j.Message;
22 import net.sf.beep4j.ProtocolException;
23 import net.sf.beep4j.internal.TransportMapping;
24 import net.sf.beep4j.internal.util.Assert;
25 import net.sf.beep4j.transport.Transport;
26
27 public class TCPMapping implements TransportMapping, ChannelControllerFactory {
28
29 private static final int DEFAULT_BUFFER_SIZE = 4096;
30
31 private final Transport transport;
32
33 private final ChannelControllerFactory factory;
34
35 private final int bufferSize;
36
37 private final Map<Integer, ChannelController> channels =
38 new HashMap<Integer, ChannelController>();
39
40 public TCPMapping(Transport transport) {
41 this(transport, null);
42 }
43
44 public TCPMapping(Transport transport, ChannelControllerFactory factory) {
45 this(transport, factory, DEFAULT_BUFFER_SIZE);
46 }
47
48 public TCPMapping(Transport transport, ChannelControllerFactory factory, int bufferSize) {
49 Assert.notNull("transport", transport);
50 this.transport = transport;
51 this.factory = factory != null ? factory : this;
52 this.bufferSize = bufferSize;
53 }
54
55
56
57
58 public void channelStarted(int channelNumber) {
59 if (channels.containsKey(channelNumber)) {
60 throw new IllegalArgumentException("there is already a channel for channel number: "
61 + channelNumber);
62 }
63 ChannelController controller = factory.createChannelController(channelNumber, transport);
64 channels.put(channelNumber, controller);
65 }
66
67 public void channelClosed(int channelNumber) {
68 channels.remove(channelNumber);
69 }
70
71
72
73
74
75
76 public DefaultChannelController createChannelController(int channelNumber, Transport transport) {
77 return new DefaultChannelController(transport, channelNumber, bufferSize);
78 }
79
80
81
82
83
84
85 public void checkFrame(int channel, long seqno, int size) {
86 getChannelController(channel).checkFrame(seqno, size);
87 }
88
89 public void frameReceived(int channel, long seqno, int size) {
90 getChannelController(channel).frameReceived(seqno, size);
91 }
92
93 public void processMappingFrame(String[] tokens) {
94 if (!tokens[0].equals(SEQHeader.TYPE)) {
95 throw new ProtocolException("unsupported frame type: " + tokens[0]);
96 }
97
98 SEQHeader header = new SEQHeader(tokens);
99 int channel = header.getChannel();
100 long ackno = header.getAcknowledgeNumber();
101 int size = header.getWindowSize();
102
103
104 getChannelController(channel).updateSendWindow(ackno, size);
105 }
106
107 public void sendANS(int channel, int messageNumber, int answerNumber, Message message) {
108 getChannelController(channel).sendANS(messageNumber, answerNumber, message);
109 }
110
111 public void sendERR(int channel, int messageNumber, Message message) {
112 getChannelController(channel).sendERR(messageNumber, message);
113 }
114
115 public void sendMSG(int channel, int messageNumber, Message message) {
116 getChannelController(channel).sendMSG(messageNumber, message);
117 }
118
119 public void sendNUL(int channel, int messageNumber) {
120 getChannelController(channel).sendNUL(messageNumber);
121 }
122
123 public void sendRPY(int channel, int messageNumber, Message message) {
124 getChannelController(channel).sendRPY(messageNumber, message);
125 }
126
127 public void closeTransport() {
128 transport.closeTransport();
129 }
130
131
132
133
134 private ChannelController getChannelController(int channel) {
135 ChannelController controller = channels.get(new Integer(channel));
136 if (controller == null) {
137 throw new ProtocolException("unkown channel: " + channel);
138 }
139 return controller;
140 }
141
142 }