1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.transport.mina;
17
18 import net.sf.beep4j.ChannelFilterChainBuilder;
19 import net.sf.beep4j.SessionHandler;
20 import net.sf.beep4j.internal.SessionImpl;
21 import net.sf.beep4j.internal.TransportMapping;
22 import net.sf.beep4j.internal.tcp.TCPMapping;
23 import net.sf.beep4j.internal.util.HexDump;
24 import net.sf.beep4j.transport.LoggingTransportContext;
25 import net.sf.beep4j.transport.Transport;
26 import net.sf.beep4j.transport.TransportContext;
27
28 import org.apache.mina.common.ByteBuffer;
29 import org.apache.mina.common.IoHandlerAdapter;
30 import org.apache.mina.common.IoSession;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34
35
36
37
38
39
40 public class MinaTransport extends IoHandlerAdapter implements Transport {
41
42 private static final Logger DATA_LOG = LoggerFactory.getLogger("net.sf.beep4j.transport.DATA");
43
44 private static final Logger LOG = LoggerFactory.getLogger("net.sf.beep4j.transport");
45
46 private IoSession session;
47
48 private TransportContext context;
49
50 public MinaTransport(boolean initiator, SessionHandler sessionHandler, ChannelFilterChainBuilder builder) {
51 TransportMapping mapping = new TCPMapping(this);
52 context = new LoggingTransportContext(new SessionImpl(initiator, sessionHandler, mapping));
53 }
54
55 public void sendBytes(java.nio.ByteBuffer buffer) {
56 if (LOG.isDebugEnabled()) {
57 LOG.debug("sending " + buffer.remaining() + " bytes");
58 }
59 if (DATA_LOG.isDebugEnabled()) {
60 DATA_LOG.debug(HexDump.dump(buffer));
61 }
62 session.write(ByteBuffer.wrap(buffer));
63 }
64
65 public void closeTransport() {
66 if (LOG.isDebugEnabled()) {
67 LOG.debug("close transport");
68 }
69 session.close();
70 }
71
72 @Override
73 public void sessionOpened(IoSession session) throws Exception {
74 if (LOG.isDebugEnabled()) {
75 LOG.debug("transport session opened");
76 }
77 this.session = session;
78 context.connectionEstablished(session.getRemoteAddress());
79 }
80
81 @Override
82 public void messageSent(IoSession session, Object message) throws Exception {
83 LOG.debug("bytes sent on underlying transport");
84 }
85
86 @Override
87 public void messageReceived(IoSession session, Object message) throws Exception {
88 ByteBuffer buffer = (ByteBuffer) message;
89 if (LOG.isDebugEnabled()) {
90 LOG.debug("received " + buffer.remaining() + " bytes");
91 }
92 if (DATA_LOG.isDebugEnabled()) {
93 DATA_LOG.debug(HexDump.dump(buffer.buf()));
94 }
95 context.messageReceived(buffer.buf());
96 }
97
98 @Override
99 public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
100 if (LOG.isDebugEnabled()) {
101 LOG.debug("caugth exception", cause);
102 }
103 context.exceptionCaught(cause);
104 }
105
106 @Override
107 public void sessionClosed(IoSession session) throws Exception {
108 if (LOG.isDebugEnabled()) {
109 LOG.debug("transport session closed by remote peer");
110 }
111 context.connectionClosed();
112 }
113
114 }