1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.beep4j.internal;
17
18 import java.nio.ByteBuffer;
19 import java.util.Arrays;
20
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24
25 public class DefaultStreamParser implements StreamParser, ParseStateContext {
26
27 private static final Logger LOG = LoggerFactory.getLogger(StreamParser.class);
28
29 private final TransportMapping mapping;
30
31 private final FrameHandler handler;
32
33 private final ParseState headerState = new HeaderState();
34
35 private final ParseState trailerState = new TrailerState();
36
37
38
39 private ParseState state;
40
41 private DataHeader header;
42
43 private ByteBuffer payload;
44
45 public DefaultStreamParser(FrameHandler handler, TransportMapping mapping) {
46 this.handler = handler;
47 this.mapping = mapping;
48 this.state = headerState;
49 }
50
51 public void process(ByteBuffer buffer) {
52 while (state.process(buffer, this));
53 }
54
55 protected void forward(Frame frame) {
56 handler.handleFrame(frame);
57 if (frame.getHeader().getPayloadSize() > 0) {
58 mapping.frameReceived(
59 frame.getChannelNumber(), frame.getSequenceNumber(), frame.getSize());
60 }
61 }
62
63 public void handleHeader(String[] tokens) {
64 LOG.debug("got header: " + Arrays.toString(tokens));
65
66 if (isStandardType(tokens[0])) {
67 header = DataHeader.parseHeader(tokens);
68
69 int channel = header.getChannel();
70 int size = header.getPayloadSize();
71 long seqno = header.getSequenceNumber();
72 mapping.checkFrame(channel, seqno, size);
73
74 LOG.debug("moving to payload state");
75 state = new PayloadState(header.getPayloadSize());
76
77 } else {
78 mapping.processMappingFrame(tokens);
79 }
80 }
81
82 private boolean isStandardType(String type) {
83 return MessageType.ANS.name().equals(type)
84 || MessageType.ERR.name().equals(type)
85 || MessageType.MSG.name().equals(type)
86 || MessageType.NUL.name().equals(type)
87 || MessageType.RPY.name().equals(type);
88 }
89
90 public void handlePayload(ByteBuffer payload) {
91 LOG.debug("got payload, moving to trailer state");
92 this.payload = payload;
93 state = trailerState;
94 }
95
96 public void handleTrailer() {
97 LOG.debug("got trailer, moving to header state");
98 Frame frame = new Frame(header, payload);
99 forward(frame);
100 header = null;
101 payload = null;
102 state = headerState;
103 }
104
105 }