1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package ca.uhn.hl7v2.app;
29
30 import ca.uhn.hl7v2.HL7Exception;
31 import ca.uhn.hl7v2.HapiContext;
32 import ca.uhn.hl7v2.app.Receiver.ReceiverParserExceptionHandler;
33 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
34 import ca.uhn.hl7v2.concurrent.Service;
35 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
36 import ca.uhn.hl7v2.model.Message;
37 import ca.uhn.hl7v2.parser.Parser;
38 import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
39 import ca.uhn.hl7v2.protocol.ReceivingApplication;
40 import ca.uhn.hl7v2.protocol.ReceivingApplicationExceptionHandler;
41 import ca.uhn.hl7v2.protocol.impl.AppRoutingDataImpl;
42 import ca.uhn.hl7v2.protocol.impl.AppWrapper;
43 import ca.uhn.hl7v2.protocol.impl.ApplicationRouterImpl;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import java.io.BufferedReader;
48 import java.io.File;
49 import java.io.FileReader;
50 import java.io.IOException;
51 import java.util.*;
52 import java.util.concurrent.ExecutorService;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class HL7Service extends Service {
68
69 private static final Logger log = LoggerFactory.getLogger(HL7Service.class);
70
71 private final List<Connection> connections;
72 private final Parser parser;
73 private final LowerLayerProtocol llp;
74 private final List<ConnectionListener> listeners;
75 private final ConnectionCleaner cleaner;
76 private final ApplicationRouterImpl applicationRouter;
77 private ReceiverParserExceptionHandler parserExeptionHandler;
78
79
80
81
82 public HL7Service(HapiContext theHapiContext) {
83 this(theHapiContext.getGenericParser(), theHapiContext.getLowerLayerProtocol(), theHapiContext.getExecutorService());
84 }
85
86
87
88
89
90 public HL7Service(final Parser parser, final LowerLayerProtocol llp) {
91 this(parser, llp, DefaultExecutorService.getDefaultService());
92 }
93
94
95
96
97
98
99
100
101 public HL7Service(Parser parser, LowerLayerProtocol llp,
102 ExecutorService executorService) {
103 super("HL7 Server", executorService);
104 this.connections = new ArrayList<>();
105 this.listeners = new ArrayList<>();
106 this.parser = parser;
107 this.llp = llp;
108 this.applicationRouter = new ApplicationRouterImpl(parser);
109 this.cleaner = new ConnectionCleaner(this);
110
111
112 assert !this.cleaner.isRunning();
113 }
114
115
116
117
118
119
120
121 @Override
122 protected void afterStartup() {
123
124
125 cleaner.start();
126 }
127
128
129
130
131
132
133
134 @Override
135 protected void afterTermination() {
136 super.afterTermination();
137 cleaner.stopAndWait();
138 for (Connection c : connections) {
139 try {
140 c.close();
141 } catch (IOException e) {
142 throw new RuntimeException(e);
143 }
144 }
145 }
146
147
148
149
150
151
152
153 protected boolean keepRunning() {
154 return isRunning();
155 }
156
157 LowerLayerProtocol getLlp() {
158 return llp;
159 }
160
161 Parser getParser() {
162 return parser;
163 }
164
165
166
167
168
169
170
171 public synchronized void newConnection(ActiveConnection c) {
172 c.getResponder().setApplicationRouter(applicationRouter);
173 c.setReceiverParserExeptionHandler(parserExeptionHandler);
174 c.activate();
175 connections.add(c);
176 notifyListeners(c);
177 }
178
179
180
181
182
183
184
185
186
187 public Connection getRemoteConnection(String ipAddress) {
188 Connection conn = null;
189 while (conn == null) {
190
191 int c = 0;
192 synchronized (this) {
193 while (conn == null && c < connections.size()) {
194 Connection nextConn = connections.get(c);
195 if (nextConn.getRemoteAddress().getHostAddress().equals(ipAddress))
196 conn = nextConn;
197 c++;
198 }
199 }
200
201 if (conn == null) {
202 try {
203 Thread.sleep(100);
204 } catch (InterruptedException e) {
205
206 }
207 }
208 }
209 return conn;
210 }
211
212
213
214
215
216
217 public synchronized List<Connection> getRemoteConnections() {
218 return connections;
219 }
220
221
222
223
224
225
226
227
228 public synchronized void registerConnectionListener(
229 ConnectionListener listener) {
230 listeners.add(listener);
231 }
232
233
234
235
236 private void notifyListeners(Connection c) {
237 for (ConnectionListener cl : listeners) {
238 if (c.isOpen()) {
239 cl.connectionReceived(c);
240 } else {
241 cl.connectionDiscarded(c);
242 }
243 }
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257 public synchronized void registerApplication(String messageType,
258 String triggerEvent, Application handler) {
259 ReceivingApplication<Message> handlerWrapper = new AppWrapper(handler);
260 applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handlerWrapper);
261 }
262
263
264
265
266
267
268
269
270
271 public void registerApplication(String messageType, String triggerEvent, ReceivingApplication handler) {
272 applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handler);
273 }
274
275
276
277
278
279 public synchronized void registerApplication(AppRoutingData appRouting, ReceivingApplication<? extends Message> application) {
280 if (appRouting == null) {
281 throw new NullPointerException("appRouting must not be null");
282 }
283 applicationRouter.bindApplication(appRouting, application);
284 }
285
286
287
288
289
290 public synchronized void registerApplication(ReceivingApplication<? extends Message> application) {
291
292 registerApplication(new AppRoutingDataImpl("*", "*", "*", "*"), application);
293 }
294
295
296
297
298
299
300
301 public synchronized boolean unregisterApplication(AppRoutingData appRouting) {
302 if (appRouting == null) {
303 throw new NullPointerException("appRouting must not be null");
304 }
305 return applicationRouter.unbindApplication(appRouting);
306 }
307
308
309
310
311
312
313
314 public synchronized boolean unregisterApplication(ReceivingApplication<? extends Message> application) {
315 if (application == null) {
316 throw new NullPointerException("application must not be null");
317 }
318 return applicationRouter.unbindApplication(application);
319 }
320
321
322
323
324
325
326 public synchronized void setExceptionHandler(ReceivingApplicationExceptionHandler exHandler) {
327 applicationRouter.setExceptionHandler(exHandler);
328 }
329
330
331
332
333
334 public void setParserExeptionHandler(ReceiverParserExceptionHandler parserExeptionHandler) {
335 this.parserExeptionHandler = parserExeptionHandler;
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371 public void loadApplicationsFromFile(File f) throws IOException,
372 HL7Exception, ClassNotFoundException, InstantiationException,
373 IllegalAccessException {
374 try (BufferedReader in = new BufferedReader(new FileReader(f))) {
375 String line;
376 while ((line = in.readLine()) != null) {
377
378 StringTokenizer tok = new StringTokenizer(line, "\t", false);
379 String type, event, className;
380
381 if (tok.hasMoreTokens()) {
382 try {
383 type = tok.nextToken();
384 event = tok.nextToken();
385 className = tok.nextToken();
386 } catch (NoSuchElementException ne) {
387 throw new HL7Exception(
388 "Can't register applications from file "
389 + f.getName()
390 + ". The line '"
391 + line
392 + "' is not of the form: message_type [tab] trigger_event [tab] application_class.");
393 }
394
395 try {
396 @SuppressWarnings("unchecked")
397 Class<? extends Application> appClass = (Class<? extends Application>) Class
398 .forName(className);
399
400 Application app = appClass.newInstance();
401 registerApplication(type, event, new AppWrapper(app));
402 } catch (ClassCastException cce) {
403 throw new HL7Exception("The specified class, " + className
404 + ", doesn't implement Application.");
405 }
406
407 }
408 }
409 }
410
411 }
412
413
414
415
416
417
418
419
420
421
422
423
424 private static class ConnectionCleaner extends Service {
425
426 private final HL7Service service;
427
428 public ConnectionCleaner(HL7Service service) {
429 super("ConnectionCleaner", service.getExecutorService());
430 this.service = service;
431 }
432
433 @Override
434 public void start() {
435 log.info("Starting ConnectionCleaner service");
436 super.start();
437 }
438
439 public void handle() {
440 try {
441 Thread.sleep(500);
442 synchronized (service) {
443 Iterator<Connection> it = service.getRemoteConnections()
444 .iterator();
445 while (it.hasNext()) {
446 Connection conn = it.next();
447 if (!conn.isOpen()) {
448 log.debug(
449 "Removing connection from {} from connection list",
450 conn.getRemoteAddress().getHostAddress());
451 it.remove();
452 service.notifyListeners(conn);
453 }
454 }
455 }
456 } catch (InterruptedException e) {
457
458 }
459 }
460
461 }
462
463 }