1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
|
21 | |
|
22 | |
|
23 | |
|
24 | |
|
25 | |
|
26 | |
|
27 | |
package ca.uhn.hl7v2.app; |
28 | |
|
29 | |
import java.io.File; |
30 | |
import java.io.IOException; |
31 | |
import java.net.Socket; |
32 | |
import java.net.SocketException; |
33 | |
import java.util.HashMap; |
34 | |
import java.util.Map; |
35 | |
import java.util.concurrent.BlockingQueue; |
36 | |
import java.util.concurrent.ExecutorService; |
37 | |
import java.util.concurrent.LinkedBlockingQueue; |
38 | |
import java.util.concurrent.TimeUnit; |
39 | |
|
40 | |
import org.slf4j.Logger; |
41 | |
import org.slf4j.LoggerFactory; |
42 | |
|
43 | |
import ca.uhn.hl7v2.DefaultHapiContext; |
44 | |
import ca.uhn.hl7v2.HapiContext; |
45 | |
import ca.uhn.hl7v2.app.AcceptorThread.AcceptedSocket; |
46 | |
import ca.uhn.hl7v2.concurrent.DefaultExecutorService; |
47 | |
import ca.uhn.hl7v2.llp.LLPException; |
48 | |
import ca.uhn.hl7v2.llp.LowerLayerProtocol; |
49 | |
import ca.uhn.hl7v2.llp.MinLowerLayerProtocol; |
50 | |
import ca.uhn.hl7v2.parser.Parser; |
51 | |
import ca.uhn.hl7v2.parser.PipeParser; |
52 | |
import ca.uhn.hl7v2.util.SocketFactory; |
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
public class TwoPortService extends HL7Service { |
62 | |
|
63 | 10 | private static final Logger log = LoggerFactory |
64 | 5 | .getLogger(TwoPortService.class); |
65 | |
|
66 | 20 | private Map<String, AcceptedSocket> waitingForSecondSocket = new HashMap<String, AcceptedSocket>(); |
67 | |
private int inboundPort; |
68 | |
private int outboundPort; |
69 | |
private boolean tls; |
70 | |
private BlockingQueue<AcceptedSocket> queue; |
71 | |
private AcceptorThread inboundAcceptor, outboundAcceptor; |
72 | |
private final HapiContext hapiContext; |
73 | |
|
74 | |
public TwoPortService(int inboundPort, int outboundPort) { |
75 | 10 | this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort, |
76 | |
outboundPort, false); |
77 | 10 | } |
78 | |
|
79 | |
public TwoPortService(int inboundPort, int outboundPort, boolean tls) { |
80 | 0 | this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort, |
81 | |
outboundPort, tls); |
82 | 0 | } |
83 | |
|
84 | |
|
85 | |
public TwoPortService(Parser parser, LowerLayerProtocol llp, |
86 | |
int inboundPort, int outboundPort, boolean tls) { |
87 | 20 | this(parser, llp, inboundPort, outboundPort, tls, |
88 | 10 | DefaultExecutorService.getDefaultService()); |
89 | 10 | } |
90 | |
|
91 | |
|
92 | |
public TwoPortService(HapiContext hapiContext, |
93 | |
int inboundPort, int outboundPort, boolean tls) { |
94 | 10 | super(hapiContext); |
95 | 10 | this.hapiContext = hapiContext; |
96 | 10 | this.queue = new LinkedBlockingQueue<AcceptedSocket>(); |
97 | 10 | this.inboundPort = inboundPort; |
98 | 10 | this.outboundPort = outboundPort; |
99 | 10 | this.tls = tls; |
100 | |
|
101 | 10 | if (inboundPort == outboundPort) { |
102 | 0 | throw new IllegalArgumentException("Inbound port and outbound port can not be the same"); |
103 | |
} |
104 | 10 | if (inboundPort < 1) { |
105 | 0 | throw new IllegalArgumentException("Invalid inbound port"); |
106 | |
} |
107 | 10 | if (outboundPort < 1) { |
108 | 0 | throw new IllegalArgumentException("Invalid outbound port"); |
109 | |
} |
110 | |
|
111 | 10 | } |
112 | |
|
113 | |
|
114 | |
public TwoPortService(Parser parser, LowerLayerProtocol llp, |
115 | |
int inboundPort, int outboundPort, boolean tls, |
116 | |
ExecutorService executorService) { |
117 | 10 | super(parser, llp, executorService); |
118 | 10 | this.hapiContext = new DefaultHapiContext(); |
119 | 10 | this.queue = new LinkedBlockingQueue<AcceptedSocket>(); |
120 | 10 | this.inboundPort = inboundPort; |
121 | 10 | this.outboundPort = outboundPort; |
122 | 10 | this.tls = tls; |
123 | 10 | } |
124 | |
|
125 | |
|
126 | |
|
127 | |
|
128 | |
|
129 | |
|
130 | |
|
131 | |
@Override |
132 | |
protected void afterStartup() { |
133 | |
try { |
134 | 15 | super.afterStartup(); |
135 | 15 | inboundAcceptor = createAcceptThread(inboundPort); |
136 | 15 | outboundAcceptor = createAcceptThread(outboundPort); |
137 | 15 | inboundAcceptor.start(); |
138 | 15 | outboundAcceptor.start(); |
139 | 30 | log.info("TwoPortService running on ports {} and {}", inboundPort, |
140 | 15 | outboundPort); |
141 | 0 | } catch (IOException e) { |
142 | 0 | log.error("Could not run TwoPortService on ports {} and {}", |
143 | 0 | inboundPort, outboundPort); |
144 | 0 | throw new RuntimeException(e); |
145 | 15 | } |
146 | 15 | } |
147 | |
|
148 | |
|
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
@Override |
154 | |
protected void afterTermination() { |
155 | 15 | super.afterTermination(); |
156 | 15 | inboundAcceptor.stop(); |
157 | 15 | outboundAcceptor.stop(); |
158 | 15 | } |
159 | |
|
160 | |
|
161 | |
|
162 | |
|
163 | |
protected void handle() { |
164 | 122 | if (inboundAcceptor.getServiceExitedWithException() != null) { |
165 | 0 | setServiceExitedWithException(inboundAcceptor.getServiceExitedWithException()); |
166 | |
} |
167 | 122 | if (outboundAcceptor.getServiceExitedWithException() != null) { |
168 | 0 | setServiceExitedWithException(outboundAcceptor.getServiceExitedWithException()); |
169 | |
} |
170 | |
|
171 | |
try { |
172 | 122 | ActiveConnection conn = acceptConnection(queue.poll(2, TimeUnit.SECONDS)); |
173 | 122 | if (conn != null) { |
174 | 98 | log.info("Accepted connection from " |
175 | 49 | + conn.getRemoteAddress().getHostAddress()); |
176 | 49 | newConnection(conn); |
177 | |
} |
178 | 0 | } catch (Exception e) { |
179 | 0 | log.error("Error while accepting connections: ", e); |
180 | 122 | } |
181 | 122 | } |
182 | |
|
183 | |
|
184 | |
|
185 | |
|
186 | |
|
187 | |
|
188 | |
private ActiveConnection acceptConnection(AcceptedSocket newSocket) |
189 | |
throws LLPException, IOException { |
190 | 122 | ActiveConnection conn = null; |
191 | 122 | if (newSocket != null) { |
192 | 101 | String address = newSocket.socket.getInetAddress().getHostAddress(); |
193 | 101 | AcceptedSocket otherSocket = waitingForSecondSocket.remove(address); |
194 | 101 | if (otherSocket != null && otherSocket.origin != newSocket.origin) { |
195 | 49 | log.debug("Socket {} completes a two-port connection", |
196 | |
newSocket.socket); |
197 | 49 | Socket in = getInboundSocket(newSocket, otherSocket); |
198 | 49 | Socket out = getOutboundSocket(newSocket, otherSocket); |
199 | 49 | conn = new ActiveConnection(getParser(), getLlp(), in, out, |
200 | 49 | getExecutorService()); |
201 | 49 | } else { |
202 | 52 | log.debug( |
203 | |
"Registered {} Still waiting for second socket for two-port connection", |
204 | |
newSocket.socket); |
205 | 52 | waitingForSecondSocket.put(address, newSocket); |
206 | |
} |
207 | |
} |
208 | 122 | return conn; |
209 | |
} |
210 | |
|
211 | |
private Socket getInboundSocket(AcceptedSocket socket1, |
212 | |
AcceptedSocket socket2) { |
213 | 49 | return socket1.origin == inboundAcceptor ? socket1.socket |
214 | |
: socket2.socket; |
215 | |
} |
216 | |
|
217 | |
private Socket getOutboundSocket(AcceptedSocket socket1, |
218 | |
AcceptedSocket socket2) { |
219 | 49 | return socket1.origin == outboundAcceptor ? socket1.socket |
220 | |
: socket2.socket; |
221 | |
} |
222 | |
|
223 | |
protected AcceptorThread createAcceptThread(int port) |
224 | |
throws SocketException, IOException { |
225 | 30 | SocketFactory ss = this.hapiContext.getSocketFactory(); |
226 | 30 | return new AcceptorThread(port, tls, getExecutorService(), queue, ss); |
227 | |
} |
228 | |
|
229 | |
|
230 | |
|
231 | |
|
232 | |
|
233 | |
|
234 | |
|
235 | |
|
236 | |
public static void main(String args[]) { |
237 | 0 | if (args.length < 2 || args.length > 3) { |
238 | 0 | System.out |
239 | 0 | .println("Usage: ca.uhn.hl7v2.app.TwoPortService inbound_port outbound_port [application_spec_file_name]"); |
240 | 0 | System.exit(1); |
241 | |
} |
242 | |
|
243 | 0 | int inPort = 0; |
244 | 0 | int outPort = 0; |
245 | |
try { |
246 | 0 | inPort = Integer.parseInt(args[0]); |
247 | 0 | outPort = Integer.parseInt(args[1]); |
248 | 0 | } catch (NumberFormatException e) { |
249 | 0 | System.err.println("One of the given ports (" + args[0] + " or " |
250 | |
+ args[1] + ") is not an integer."); |
251 | 0 | System.exit(1); |
252 | 0 | } |
253 | |
|
254 | 0 | File appFile = null; |
255 | 0 | if (args.length == 3) { |
256 | 0 | appFile = new File(args[2]); |
257 | |
} |
258 | |
|
259 | |
try { |
260 | 0 | TwoPortService server = new TwoPortService(inPort, outPort); |
261 | 0 | if (appFile != null) |
262 | 0 | server.loadApplicationsFromFile(appFile); |
263 | 0 | server.start(); |
264 | 0 | } catch (Exception e) { |
265 | 0 | e.printStackTrace(); |
266 | 0 | } |
267 | |
|
268 | 0 | } |
269 | |
|
270 | |
} |