1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.integration;
17
18 import java.io.BufferedReader;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.InputStreamReader;
22 import java.io.Writer;
23 import java.net.SocketAddress;
24 import java.util.concurrent.Semaphore;
25
26 import junit.framework.Assert;
27 import junit.framework.TestCase;
28 import net.sf.beep4j.Channel;
29 import net.sf.beep4j.CloseChannelCallback;
30 import net.sf.beep4j.Initiator;
31 import net.sf.beep4j.Message;
32 import net.sf.beep4j.MessageBuilder;
33 import net.sf.beep4j.ProfileInfo;
34 import net.sf.beep4j.Reply;
35 import net.sf.beep4j.ReplyHandler;
36 import net.sf.beep4j.Session;
37 import net.sf.beep4j.SessionHandler;
38 import net.sf.beep4j.SessionHandlerFactory;
39 import net.sf.beep4j.StartChannelRequest;
40 import net.sf.beep4j.StartSessionRequest;
41 import net.sf.beep4j.ext.ChannelHandlerAdapter;
42 import net.sf.beep4j.ext.SessionHandlerAdapter;
43 import net.sf.beep4j.transport.mina.MinaInitiator;
44 import net.sf.beep4j.transport.mina.MinaListener;
45
46 import org.apache.mina.common.IoAcceptor;
47 import org.apache.mina.common.IoConnector;
48 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
49 import org.apache.mina.transport.vmpipe.VmPipeAddress;
50 import org.apache.mina.transport.vmpipe.VmPipeConnector;
51
52 public class EchoIntegrationTest extends TestCase {
53
54 public void testOneToManyEcho() throws Exception {
55 ProfileInfo profile = new ProfileInfo(OneToManyEchoProfileHandler.PROFILE, "8192");
56 String text = loadMessage("rfc3080.txt");
57 doTest(profile, 1, text, 8001);
58 }
59
60 public void testEcho() throws Exception {
61 ProfileInfo profile = new ProfileInfo(EchoProfileHandler.PROFILE);
62 String text = loadMessage("rfc3080.txt");
63 doTest(profile, 1, text, 8001);
64 }
65
66 public void testSimultanousEcho() throws Exception {
67 ProfileInfo profile = new ProfileInfo(EchoProfileHandler.PROFILE);
68 String text = loadMessage("rfc3080.txt");
69 doTest(profile, 3, text, 8001);
70 }
71
72 public void testOneToManySimultanousEcho() throws Exception {
73 ProfileInfo profile = new ProfileInfo(OneToManyEchoProfileHandler.PROFILE, "8192");
74 String text = loadMessage("rfc3080.txt");
75 doTest(profile, 3, text, 8001);
76 }
77
78 protected void doTest(ProfileInfo profile, int channels, String text, int port) throws Exception {
79 Semaphore sem = new Semaphore(-channels);
80
81 IoAcceptor acceptor = new VmPipeAcceptor();
82
83 SocketAddress address = new VmPipeAddress(port);
84
85 MinaListener listener = new MinaListener(acceptor);
86 listener.bind(address, new EchoSessionHandlerFactory(sem));
87
88 IoConnector connector = new VmPipeConnector();
89 EchoClientHandler client = new EchoClientHandler(profile, channels, text, sem);
90
91 Initiator initiator = new MinaInitiator(connector);
92 initiator.connect(address, client);
93
94 sem.acquire();
95 listener.unbind(address);
96
97 client.assertEquals(text);
98 }
99
100 private String loadMessage(String resource) throws IOException {
101 InputStream stream = getClass().getResourceAsStream(resource);
102 BufferedReader reader = new BufferedReader(new InputStreamReader(stream, "US-ASCII"));
103 StringBuilder builder = new StringBuilder();
104
105 char[] buf = new char[1024];
106 int count;
107
108 while ((count = reader.read(buf)) != -1) {
109 builder.append(new String(buf, 0, count));
110 }
111
112 reader.close();
113 return builder.toString();
114 }
115
116 private static class EchoSessionHandlerFactory implements SessionHandlerFactory {
117 private final Semaphore semaphore;
118 private EchoSessionHandlerFactory(Semaphore semaphore) {
119 this.semaphore = semaphore;
120 }
121 public SessionHandler createSessionHandler() {
122 return new EchoSessionHandler(semaphore);
123 }
124 }
125
126 protected static class EchoSessionHandler extends SessionHandlerAdapter {
127 private final Semaphore semaphore;
128
129 private EchoSessionHandler(Semaphore semaphore) {
130 this.semaphore = semaphore;
131 }
132
133 public void connectionEstablished(StartSessionRequest s) {
134 s.registerProfile(EchoProfileHandler.PROFILE);
135 s.registerProfile(OneToManyEchoProfileHandler.PROFILE);
136 }
137
138 @Override
139 public void channelStartRequested(StartChannelRequest startup) {
140 if (startup.hasProfile(EchoProfileHandler.PROFILE)) {
141 startup.selectProfile(
142 startup.getProfile(EchoProfileHandler.PROFILE),
143 new EchoProfileHandler());
144 } else if (startup.hasProfile(OneToManyEchoProfileHandler.PROFILE)) {
145 ProfileInfo profile = startup.getProfile(OneToManyEchoProfileHandler.PROFILE);
146 int size = getSize(profile.getContent());
147 startup.selectProfile(profile, new OneToManyEchoProfileHandler(size));
148 }
149 }
150
151 private int getSize(String content) {
152 try {
153 return Integer.parseInt(content.trim());
154 } catch (Exception e) {
155 return 8192;
156 }
157 }
158
159 @Override
160 public void sessionClosed() {
161 semaphore.release();
162 }
163 }
164
165 private class EchoClientHandler extends SessionHandlerAdapter {
166 private final ProfileInfo profile;
167 private final String text;
168 private final Semaphore semaphore;
169 private Talker[] talkers;
170
171 private EchoClientHandler(ProfileInfo profile, int channels, String text, Semaphore semaphore) {
172 this.profile = profile;
173 this.talkers = new Talker[channels];
174 this.text = text;
175 this.semaphore = semaphore;
176 }
177
178 public void assertEquals(String text) {
179 for (int i = 0; i < talkers.length; i++) {
180 Talker talker = talkers[i];
181 talker.assertEquals(text);
182 }
183 }
184
185 @Override
186 public void sessionOpened(Session session) {
187 for (int i = 0; i < talkers.length; i++) {
188 talkers[i] = new Talker(text, semaphore);
189 session.startChannel(profile, talkers[i]);
190 }
191 }
192 }
193
194 protected class Talker extends ChannelHandlerAdapter {
195
196 private final String expected;
197
198 private final Semaphore semaphore;
199
200 private EchoListener listener;
201
202 protected Talker(String expected, Semaphore semaphore) {
203 this.expected = expected;
204 this.semaphore = semaphore;
205 }
206
207 public void assertEquals(String text) {
208 Assert.assertEquals(text, listener.getReceivedText());
209 }
210
211 public void channelOpened(Channel c) {
212 MessageBuilder builder = c.createMessageBuilder();
213 builder.setCharsetName("US-ASCII");
214 try {
215 Writer writer = builder.getWriter();
216 writer.write(expected);
217 writer.close();
218 listener = new EchoListener(c, semaphore);
219 c.sendMessage(builder.getMessage(), listener);
220 } catch (IOException e) {
221 throw new RuntimeException(e);
222 }
223 }
224
225 public void messageReceived(Message message, Reply handler) {
226 throw new UnsupportedOperationException();
227 }
228
229 }
230
231 protected class EchoListener implements ReplyHandler {
232 private final Channel channel;
233 private final Semaphore semaphore;
234 private final StringBuilder builder;
235 private String actual;
236
237 protected EchoListener(Channel channel, Semaphore semaphore) {
238 this.channel = channel;
239 this.semaphore = semaphore;
240 this.builder = new StringBuilder();
241 }
242
243 protected String getReceivedText() {
244 return actual;
245 }
246
247 public void receivedANS(Message message) {
248 String str = toString(message);
249 builder.append(str);
250 }
251
252 public void receivedERR(Message message) {
253 throw new UnsupportedOperationException();
254 }
255
256 public void receivedNUL() {
257 verify();
258 channel.close(new PrintingCloseCallback(channel.getSession(), semaphore));
259 }
260
261 public void receivedRPY(Message message) {
262 builder.append(toString(message));
263 verify();
264 channel.close(new PrintingCloseCallback(channel.getSession(), semaphore));
265 }
266
267 private String toString(Message message) {
268 BufferedReader reader = new BufferedReader(message.getReader("UTF-8"));
269 StringBuilder builder = new StringBuilder();
270 int i;
271
272 try {
273 while ((i = reader.read()) != -1) {
274 builder.append((char) i);
275 }
276 } catch (IOException e) {
277 throw new RuntimeException(e);
278 }
279
280 return builder.toString();
281 }
282
283 private void verify() {
284 actual = builder.toString();
285 }
286 }
287
288 private class PrintingCloseCallback implements CloseChannelCallback {
289 private final Session session;
290 private final Semaphore semaphore;
291
292 private PrintingCloseCallback(Session session, Semaphore semaphore) {
293 this.session = session;
294 this.semaphore = semaphore;
295 }
296
297 public void closeAccepted() {
298 if (semaphore.availablePermits() >= -1) {
299 session.close();
300 }
301 semaphore.release();
302 }
303
304 public void closeDeclined(int code, String message) { }
305 }
306
307 }