1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package ca.uhn.hl7v2.app;
28
29 import ca.uhn.hl7v2.DefaultHapiContext;
30 import ca.uhn.hl7v2.HapiContext;
31 import ca.uhn.hl7v2.app.AcceptorThread.AcceptedSocket;
32 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
33 import ca.uhn.hl7v2.llp.LLPException;
34 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
35 import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
36 import ca.uhn.hl7v2.parser.Parser;
37 import ca.uhn.hl7v2.parser.PipeParser;
38 import ca.uhn.hl7v2.util.SocketFactory;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import java.io.File;
43 import java.io.IOException;
44 import java.net.Socket;
45 import java.util.HashMap;
46 import java.util.Map;
47 import java.util.concurrent.BlockingQueue;
48 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.LinkedBlockingQueue;
50 import java.util.concurrent.TimeUnit;
51
52
53
54
55
56
57
58
59 public class TwoPortService extends HL7Service {
60
61 private static final Logger log = LoggerFactory
62 .getLogger(TwoPortService.class);
63
64 private final Map<String, AcceptedSocket> waitingForSecondSocket = new HashMap<>();
65 private final int inboundPort;
66 private final int outboundPort;
67 private final boolean tls;
68 private final BlockingQueue<AcceptedSocket> queue;
69 private AcceptorThread inboundAcceptor, outboundAcceptor;
70 private final HapiContext hapiContext;
71
72 public TwoPortService(int inboundPort, int outboundPort) {
73 this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort,
74 outboundPort, false);
75 }
76
77 public TwoPortService(int inboundPort, int outboundPort, boolean tls) {
78 this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort,
79 outboundPort, tls);
80 }
81
82
83 public TwoPortService(Parser parser, LowerLayerProtocol llp,
84 int inboundPort, int outboundPort, boolean tls) {
85 this(parser, llp, inboundPort, outboundPort, tls,
86 DefaultExecutorService.getDefaultService());
87 }
88
89
90 public TwoPortService(HapiContext hapiContext,
91 int inboundPort, int outboundPort, boolean tls) {
92 super(hapiContext);
93 this.hapiContext = hapiContext;
94 this.queue = new LinkedBlockingQueue<>();
95 this.inboundPort = inboundPort;
96 this.outboundPort = outboundPort;
97 this.tls = tls;
98
99 if (inboundPort == outboundPort) {
100 throw new IllegalArgumentException("Inbound port and outbound port can not be the same");
101 }
102 if (inboundPort < 1) {
103 throw new IllegalArgumentException("Invalid inbound port");
104 }
105 if (outboundPort < 1) {
106 throw new IllegalArgumentException("Invalid outbound port");
107 }
108
109 }
110
111
112 public TwoPortService(Parser parser, LowerLayerProtocol llp,
113 int inboundPort, int outboundPort, boolean tls,
114 ExecutorService executorService) {
115 super(parser, llp, executorService);
116 this.hapiContext = new DefaultHapiContext();
117 this.queue = new LinkedBlockingQueue<>();
118 this.inboundPort = inboundPort;
119 this.outboundPort = outboundPort;
120 this.tls = tls;
121 }
122
123
124
125
126
127
128
129 @Override
130 protected void afterStartup() {
131 try {
132 super.afterStartup();
133 inboundAcceptor = createAcceptThread(inboundPort);
134 outboundAcceptor = createAcceptThread(outboundPort);
135 inboundAcceptor.start();
136 outboundAcceptor.start();
137 log.info("TwoPortService running on ports {} and {}", inboundPort,
138 outboundPort);
139 } catch (Exception e) {
140 log.error("Could not run TwoPortService on ports {} and {}",
141 inboundPort, outboundPort);
142 throw new RuntimeException(e);
143 }
144 }
145
146
147
148
149
150
151 @Override
152 protected void afterTermination() {
153 super.afterTermination();
154 inboundAcceptor.stop();
155 outboundAcceptor.stop();
156 }
157
158
159
160
161 protected void handle() {
162 if (inboundAcceptor.getServiceExitedWithException() != null) {
163 setServiceExitedWithException(inboundAcceptor.getServiceExitedWithException());
164 }
165 if (outboundAcceptor.getServiceExitedWithException() != null) {
166 setServiceExitedWithException(outboundAcceptor.getServiceExitedWithException());
167 }
168
169 try {
170 ActiveConnection conn = acceptConnection(queue.poll(2, TimeUnit.SECONDS));
171 if (conn != null) {
172 log.info("Accepted connection from "
173 + conn.getRemoteAddress().getHostAddress());
174 newConnection(conn);
175 }
176 } catch (Exception e) {
177 log.error("Error while accepting connections: ", e);
178 }
179 }
180
181
182
183
184
185
186 private ActiveConnection acceptConnection(AcceptedSocket newSocket)
187 throws LLPException, IOException {
188 ActiveConnection conn = null;
189 if (newSocket != null) {
190 String address = newSocket.socket.getInetAddress().getHostAddress();
191 AcceptedSocket otherSocket = waitingForSecondSocket.remove(address);
192 if (otherSocket != null && otherSocket.origin != newSocket.origin) {
193 log.debug("Socket {} completes a two-port connection",
194 newSocket.socket);
195 Socket in = getInboundSocket(newSocket, otherSocket);
196 Socket out = getOutboundSocket(newSocket, otherSocket);
197 conn = new ActiveConnection(getParser(), getLlp(), in, out,
198 getExecutorService());
199 } else {
200 log.debug(
201 "Registered {} Still waiting for second socket for two-port connection",
202 newSocket.socket);
203 waitingForSecondSocket.put(address, newSocket);
204 }
205 }
206 return conn;
207 }
208
209 private Socket getInboundSocket(AcceptedSocket socket1,
210 AcceptedSocket socket2) {
211 return socket1.origin == inboundAcceptor ? socket1.socket
212 : socket2.socket;
213 }
214
215 private Socket getOutboundSocket(AcceptedSocket socket1,
216 AcceptedSocket socket2) {
217 return socket1.origin == outboundAcceptor ? socket1.socket
218 : socket2.socket;
219 }
220
221 protected AcceptorThread createAcceptThread(int port) {
222 SocketFactory ss = this.hapiContext.getSocketFactory();
223 return new AcceptorThread(port, tls, getExecutorService(), queue, ss);
224 }
225
226
227
228
229
230
231
232
233 public static void main(String[] args) {
234 if (args.length < 2 || args.length > 3) {
235 System.out
236 .println("Usage: ca.uhn.hl7v2.app.TwoPortService inbound_port outbound_port [application_spec_file_name]");
237 System.exit(1);
238 }
239
240 int inPort = 0;
241 int outPort = 0;
242 try {
243 inPort = Integer.parseInt(args[0]);
244 outPort = Integer.parseInt(args[1]);
245 } catch (NumberFormatException e) {
246 System.err.println("One of the given ports (" + args[0] + " or "
247 + args[1] + ") is not an integer.");
248 System.exit(1);
249 }
250
251 File appFile = null;
252 if (args.length == 3) {
253 appFile = new File(args[2]);
254 }
255
256 try {
257 TwoPortService server = new TwoPortService(inPort, outPort);
258 if (appFile != null)
259 server.loadApplicationsFromFile(appFile);
260 server.start();
261 } catch (Exception e) {
262 e.printStackTrace();
263 }
264
265 }
266
267 }