1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package ca.uhn.hl7v2.app;
28
29 import java.io.IOException;
30 import java.net.InetAddress;
31 import java.net.Socket;
32 import java.util.ArrayList;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38
39 import javax.net.ssl.SSLSocket;
40
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import ca.uhn.hl7v2.concurrent.BlockingMap;
45 import ca.uhn.hl7v2.app.Receiver.ReceiverParserExceptionHandler;
46 import ca.uhn.hl7v2.concurrent.BlockingHashMap;
47 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
48 import ca.uhn.hl7v2.llp.HL7Writer;
49 import ca.uhn.hl7v2.llp.LLPException;
50 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
51 import ca.uhn.hl7v2.parser.Parser;
52
53
54
55
56
57
58 public class ActiveConnection implements Connection {
59
60 private static final Logger log = LoggerFactory.getLogger(ActiveConnection.class);
61
62 private final Initiator initiator;
63 private Responder responder;
64 private List<Socket> sockets;
65 private final HL7Writer ackWriter;
66 private final HL7Writer sendWriter;
67 private Parser parser;
68 private BlockingMap<String, String> responses;
69 private List<Receiver> receivers;
70 private boolean open = true;
71 private ExecutorService executorService;
72 private boolean acceptAll = false;
73
74
75
76
77
78 public ActiveConnection(Parser parser, LowerLayerProtocol llp,
79 Socket bidirectional) throws LLPException, IOException {
80 this(parser, llp, bidirectional, DefaultExecutorService
81 .getDefaultService());
82 }
83
84 public ActiveConnection(Parser parser, LowerLayerProtocol llp,
85 Socket bidirectional, ExecutorService executorService)
86 throws LLPException, IOException {
87 init(parser, executorService, bidirectional);
88 ackWriter = llp.getWriter(bidirectional.getOutputStream());
89 sendWriter = ackWriter;
90 this.executorService = executorService;
91 sockets.add(bidirectional);
92 receivers.add(new Receiver(this, llp.getReader(bidirectional
93 .getInputStream())));
94 this.initiator = new ActiveInitiator(this);
95 }
96
97 public ActiveConnection(Parser parser, LowerLayerProtocol llp,
98 Socket bidirectional, ExecutorService executorService,
99 boolean acceptAllMsg)
100 throws LLPException, IOException {
101 this(parser, llp, bidirectional, executorService);
102 acceptAll = acceptAllMsg;
103 }
104
105
106
107
108
109 public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
110 Socket outbound) throws LLPException, IOException {
111 this(parser, llp, inbound, outbound, DefaultExecutorService
112 .getDefaultService());
113 }
114
115
116
117
118
119 public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
120 Socket outbound, ExecutorService executorService)
121 throws LLPException, IOException {
122 init(parser, executorService, inbound);
123 ackWriter = llp.getWriter(inbound.getOutputStream());
124 sendWriter = llp.getWriter(outbound.getOutputStream());
125 sockets.add(outbound);
126 sockets.add(inbound);
127
128 receivers.add(new Receiver(this,
129 llp.getReader(inbound.getInputStream())));
130 receivers.add(new Receiver(this, llp.getReader(outbound
131 .getInputStream())));
132 this.initiator = new ActiveInitiator(this);
133 }
134
135
136 private void init(Parser parser, ExecutorService executorService, Socket inboundSocket) {
137 this.parser = parser;
138 this.executorService = executorService;
139 sockets = new ArrayList<>();
140 responses = new BlockingHashMap<>(executorService);
141 receivers = new ArrayList<>(2);
142 responder = new Responder(inboundSocket);
143 }
144
145
146
147
148 public void setReceiverParserExeptionHandler(ReceiverParserExceptionHandler parserExeptionHandler) {
149 if (receivers != null) {
150 for (Receiver receiver : receivers) {
151 receiver.setParserExeptionHandler(parserExeptionHandler);
152 }
153 }
154 }
155
156
157
158
159 public void activate() {
160 if (receivers != null) {
161 for (Receiver receiver : receivers) {
162 receiver.start();
163 }
164 }
165 }
166
167 public ExecutorService getExecutorService() {
168 return executorService;
169 }
170
171
172
173
174
175
176
177 public InetAddress getRemoteAddress() {
178 Socket s = sockets.get(0);
179 return s.getInetAddress();
180 }
181
182
183
184
185
186
187 public Integer getRemotePort() {
188 Socket s = sockets.get(0);
189 return s.getPort();
190 }
191
192
193 public Initiator getInitiator() {
194 return this.initiator;
195 }
196
197
198 public Responder getResponder() {
199 return this.responder;
200 }
201
202 public boolean isSecure() {
203 if (isOpen() && sockets.size() > 0) {
204 return (sockets.get(0) instanceof SSLSocket);
205 } else {
206 throw new IllegalStateException(
207 "Can't determine status on closed socket");
208 }
209 }
210
211
212
213
214
215 protected HL7Writer getSendWriter() {
216 return this.sendWriter;
217 }
218
219
220
221
222
223 protected HL7Writer getAckWriter() {
224 return this.ackWriter;
225 }
226
227 public Parser getParser() {
228 return this.parser;
229 }
230
231 public String toString() {
232 StringBuilder buf = new StringBuilder();
233 buf.append(getRemoteAddress().getHostName());
234 buf.append(":");
235 for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
236 Socket socket = iter.next();
237 buf.append(socket.getPort());
238 if (iter.hasNext())
239 buf.append(",");
240 }
241 return buf.toString();
242 }
243
244
245
246
247
248 protected Future<String> waitForResponse(final String messageID,
249 long timeout) throws InterruptedException {
250 return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
251 }
252
253
254
255
256
257 protected boolean isRecipientWaiting(String ackID, String message) {
258 return responses.give(ackID, message);
259 }
260
261
262 public void close() {
263
264 for (Receiver receiver : receivers) {
265 if (receiver.isRunning())
266 receiver.stop();
267 }
268
269
270 for (Socket socket : sockets) {
271 try {
272 if (!socket.isClosed())
273 socket.close();
274 } catch (Exception e) {
275 log.error("Error while stopping threads and closing sockets", e);
276 }
277 }
278
279 open = false;
280 }
281
282 public boolean isOpen() {
283 return open;
284 }
285
286 public boolean acceptAllMessages(){
287 return acceptAll;
288 }
289
290 }