1   /*
2    *  Copyright 2006 Simon Raess
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.beep4j.integration;
17  
18  import java.io.BufferedReader;
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.io.InputStreamReader;
22  import java.io.Writer;
23  import java.net.SocketAddress;
24  import java.util.concurrent.Semaphore;
25  
26  import junit.framework.Assert;
27  import junit.framework.TestCase;
28  import net.sf.beep4j.Channel;
29  import net.sf.beep4j.CloseChannelCallback;
30  import net.sf.beep4j.Initiator;
31  import net.sf.beep4j.Message;
32  import net.sf.beep4j.MessageBuilder;
33  import net.sf.beep4j.ProfileInfo;
34  import net.sf.beep4j.Reply;
35  import net.sf.beep4j.ReplyHandler;
36  import net.sf.beep4j.Session;
37  import net.sf.beep4j.SessionHandler;
38  import net.sf.beep4j.SessionHandlerFactory;
39  import net.sf.beep4j.StartChannelRequest;
40  import net.sf.beep4j.StartSessionRequest;
41  import net.sf.beep4j.ext.ChannelHandlerAdapter;
42  import net.sf.beep4j.ext.SessionHandlerAdapter;
43  import net.sf.beep4j.transport.mina.MinaInitiator;
44  import net.sf.beep4j.transport.mina.MinaListener;
45  
46  import org.apache.mina.common.IoAcceptor;
47  import org.apache.mina.common.IoConnector;
48  import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
49  import org.apache.mina.transport.vmpipe.VmPipeAddress;
50  import org.apache.mina.transport.vmpipe.VmPipeConnector;
51  
52  public class EchoIntegrationTest extends TestCase {
53  	
54  	public void testOneToManyEcho() throws Exception {
55  		ProfileInfo profile = new ProfileInfo(OneToManyEchoProfileHandler.PROFILE, "8192");
56  		String text = loadMessage("rfc3080.txt");
57  		doTest(profile, 1, text, 8001);
58  	}
59  	
60  	public void testEcho() throws Exception {
61  		ProfileInfo profile = new ProfileInfo(EchoProfileHandler.PROFILE);
62  		String text = loadMessage("rfc3080.txt");
63  		doTest(profile, 1, text, 8001);
64  	}
65  	
66  	public void testSimultanousEcho() throws Exception {
67  		ProfileInfo profile = new ProfileInfo(EchoProfileHandler.PROFILE);
68  		String text = loadMessage("rfc3080.txt");
69  		doTest(profile, 3, text, 8001);
70  	}
71  	
72  	public void testOneToManySimultanousEcho() throws Exception {
73  		ProfileInfo profile = new ProfileInfo(OneToManyEchoProfileHandler.PROFILE, "8192");
74  		String text = loadMessage("rfc3080.txt");
75  		doTest(profile, 3, text, 8001);
76  	}
77  	
78  	protected void doTest(ProfileInfo profile, int channels, String text, int port) throws Exception {
79  		Semaphore sem = new Semaphore(-channels);
80  		
81  		IoAcceptor acceptor = new VmPipeAcceptor();
82  		
83  		SocketAddress address = new VmPipeAddress(port);
84  
85  		MinaListener listener = new MinaListener(acceptor);
86  		listener.bind(address, new EchoSessionHandlerFactory(sem));
87  		
88  		IoConnector connector = new VmPipeConnector();
89  		EchoClientHandler client = new EchoClientHandler(profile, channels, text, sem);
90  		
91  		Initiator initiator = new MinaInitiator(connector);
92  		initiator.connect(address, client);
93  		
94  		sem.acquire();
95  		listener.unbind(address);
96  		
97  		client.assertEquals(text);
98  	}
99  	
100 	private String loadMessage(String resource) throws IOException {
101 		InputStream stream = getClass().getResourceAsStream(resource);
102 		BufferedReader reader = new BufferedReader(new InputStreamReader(stream, "US-ASCII"));
103 		StringBuilder builder = new StringBuilder();
104 		
105 		char[] buf = new char[1024];
106 		int count;
107 		
108 		while ((count = reader.read(buf)) != -1) {
109 			builder.append(new String(buf, 0, count));
110 		}
111 		
112 		reader.close();
113 		return builder.toString();
114 	}
115 	
116 	private static class EchoSessionHandlerFactory implements SessionHandlerFactory {
117 		private final Semaphore semaphore;
118 		private EchoSessionHandlerFactory(Semaphore semaphore) {
119 			this.semaphore = semaphore;
120 		}
121 		public SessionHandler createSessionHandler() {
122 			return new EchoSessionHandler(semaphore);
123 		}
124 	}
125 	
126 	protected static class EchoSessionHandler extends SessionHandlerAdapter {
127 		private final Semaphore semaphore;
128 		
129 		private EchoSessionHandler(Semaphore semaphore) {
130 			this.semaphore = semaphore;
131 		}
132 		
133 		public void connectionEstablished(StartSessionRequest s) {
134 			s.registerProfile(EchoProfileHandler.PROFILE);
135 			s.registerProfile(OneToManyEchoProfileHandler.PROFILE);
136 		}
137 		
138 		@Override
139 		public void channelStartRequested(StartChannelRequest startup) {
140 			if (startup.hasProfile(EchoProfileHandler.PROFILE)) {
141 				startup.selectProfile(
142 						startup.getProfile(EchoProfileHandler.PROFILE), 
143 						new EchoProfileHandler());				
144 			} else if (startup.hasProfile(OneToManyEchoProfileHandler.PROFILE)) {
145 				ProfileInfo profile = startup.getProfile(OneToManyEchoProfileHandler.PROFILE);
146 				int size = getSize(profile.getContent());
147 				startup.selectProfile(profile, new OneToManyEchoProfileHandler(size));
148 			}
149 		}
150 		
151 		private int getSize(String content) {
152 			try {
153 				return Integer.parseInt(content.trim());
154 			} catch (Exception e) {
155 				return 8192;
156 			}
157 		}
158 		
159 		@Override
160 		public void sessionClosed() {
161 			semaphore.release();
162 		}
163 	}
164 	
165 	private class EchoClientHandler extends SessionHandlerAdapter {
166 		private final ProfileInfo profile;
167 		private final String text;
168 		private final Semaphore semaphore;
169 		private Talker[] talkers;
170 		
171 		private EchoClientHandler(ProfileInfo profile, int channels, String text, Semaphore semaphore) {
172 			this.profile = profile;
173 			this.talkers = new Talker[channels];
174 			this.text = text;
175 			this.semaphore = semaphore;
176 		}
177 		
178 		public void assertEquals(String text) {
179 			for (int i = 0; i < talkers.length; i++) {
180 				Talker talker = talkers[i];
181 				talker.assertEquals(text);
182 			}
183 		}
184 		
185 		@Override
186 		public void sessionOpened(Session session) {
187 			for (int i = 0; i < talkers.length; i++) {
188 				talkers[i] = new Talker(text, semaphore);
189 				session.startChannel(profile, talkers[i]);
190 			}
191 		}
192 	}
193 	
194 	protected class Talker extends ChannelHandlerAdapter {
195 		
196 		private final String expected;
197 		
198 		private final Semaphore semaphore;
199 		
200 		private EchoListener listener;
201 		
202 		protected Talker(String expected, Semaphore semaphore) {
203 			this.expected = expected;
204 			this.semaphore = semaphore;
205 		}
206 		
207 		public void assertEquals(String text) {
208 			Assert.assertEquals(text, listener.getReceivedText());
209 		}
210 		
211 		public void channelOpened(Channel c) {
212 			MessageBuilder builder = c.createMessageBuilder();
213 			builder.setCharsetName("US-ASCII");
214 			try {
215 				Writer writer = builder.getWriter();
216 				writer.write(expected);
217 				writer.close();
218 				listener = new EchoListener(c, semaphore);
219 				c.sendMessage(builder.getMessage(), listener);
220 			} catch (IOException e) {
221 				throw new RuntimeException(e);
222 			}
223 		}
224 		
225 		public void messageReceived(Message message, Reply handler) {
226 			throw new UnsupportedOperationException();
227 		}
228 		
229 	}
230 	
231 	protected class EchoListener implements ReplyHandler {
232 		private final Channel channel;
233 		private final Semaphore semaphore;
234 		private final StringBuilder builder;
235 		private String actual;
236 		
237 		protected EchoListener(Channel channel, Semaphore semaphore) {
238 			this.channel = channel;
239 			this.semaphore = semaphore;
240 			this.builder = new StringBuilder();
241 		}
242 		
243 		protected String getReceivedText() {
244 			return actual;
245 		}
246 		
247 		public void receivedANS(Message message) {
248 			String str = toString(message);
249 			builder.append(str);
250 		}
251 		
252 		public void receivedERR(Message message) {
253 			throw new UnsupportedOperationException();
254 		}
255 		
256 		public void receivedNUL() {
257 			verify();
258 			channel.close(new PrintingCloseCallback(channel.getSession(), semaphore));
259 		}
260 		
261 		public void receivedRPY(Message message) {
262 			builder.append(toString(message));
263 			verify();
264 			channel.close(new PrintingCloseCallback(channel.getSession(), semaphore));
265 		}
266 		
267 		private String toString(Message message) {
268 			BufferedReader reader = new BufferedReader(message.getReader("UTF-8"));
269 			StringBuilder builder = new StringBuilder();
270 			int i;
271 			
272 			try {
273 				while ((i = reader.read()) != -1) {
274 					builder.append((char) i);
275 				}				
276 			} catch (IOException e) {
277 				throw new RuntimeException(e);
278 			}
279 			
280 			return builder.toString();
281 		}
282 		
283 		private void verify() {
284 			actual = builder.toString();
285 		}
286 	}
287 	
288 	private class PrintingCloseCallback implements CloseChannelCallback {
289 		private final Session session;
290 		private final Semaphore semaphore;
291 		
292 		private PrintingCloseCallback(Session session, Semaphore semaphore) {
293 			this.session = session;
294 			this.semaphore = semaphore;
295 		}
296 		
297 		public void closeAccepted() {
298 			if (semaphore.availablePermits() >= -1) {
299 				session.close();
300 			}
301 			semaphore.release();
302 		}
303 		
304 		public void closeDeclined(int code, String message) { }
305 	}	
306 
307 }