1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package ca.uhn.hl7v2.protocol.impl;
28
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import ca.uhn.hl7v2.HL7Exception;
39 import ca.uhn.hl7v2.preparser.PreParser;
40 import ca.uhn.hl7v2.protocol.Processor;
41 import ca.uhn.hl7v2.protocol.ProcessorContext;
42 import ca.uhn.hl7v2.protocol.TransportException;
43 import ca.uhn.hl7v2.protocol.TransportLayer;
44 import ca.uhn.hl7v2.protocol.Transportable;
45
46
47
48
49
50
51
52 public class ProcessorImpl implements Processor {
53
54 private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
55
56 private final ProcessorContext myContext;
57 private final Map<String, ExpiringTransportable> myAcceptAcks;
58 private final Map<String, Long> myReservations;
59 private final Map<String, ExpiringTransportable> myAvailableMessages;
60 private final boolean myThreaded;
61 private Cycler ackCycler;
62 private Cycler nonAckCycler;
63 private ExecutorService myResponseExecutorService;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
82 myContext = theContext;
83 myThreaded = isThreaded;
84 myAcceptAcks = new HashMap<>();
85 myReservations = new HashMap<>();
86 myAvailableMessages = new HashMap<>();
87
88 if (isThreaded) {
89 myResponseExecutorService = Executors.newSingleThreadExecutor();
90
91 TransportLayer local = theContext.getLocallyDrivenTransportLayer();
92 TransportLayer remote = theContext.getRemotelyDrivenTransportLayer();
93
94 ackCycler = new Cycler(this, true);
95 Thread ackThd = new Thread(ackCycler);
96 ackThd.start();
97
98 if (local != remote) {
99 nonAckCycler = new Cycler(this, false);
100 Thread nonAckThd = new Thread(nonAckCycler);
101 nonAckThd.start();
102 }
103
104 }
105 }
106
107
108
109
110 public void stop() {
111 if (myThreaded) {
112 ackCycler.stop();
113 if (nonAckCycler != null) {
114 nonAckCycler.stop();
115 }
116
117 myResponseExecutorService.shutdownNow();
118 }
119 }
120
121
122
123
124 public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
125 String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
126 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
127 String controlId = fields[0];
128 String needAcceptAck = fields[1];
129 String needAppAck = fields[2];
130
131 checkValidAckNeededCode(needAcceptAck);
132
133 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
134
135 boolean originalMode = (needAcceptAck == null && needAppAck == null);
136 if (originalMode || !NE.equals(needAcceptAck)) {
137
138 Transportable response = null;
139 int retries = 0;
140 do {
141 long until = System.currentTimeMillis() + retryIntervalMillis;
142 while (response == null && System.currentTimeMillis() < until) {
143 synchronized (this) {
144 ExpiringTransportable et = myAcceptAcks.remove(controlId);
145 if (et == null) {
146 cycleIfNeeded(true);
147 } else {
148 response = et.transportable;
149 }
150 }
151 sleepIfNeeded();
152 }
153
154 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
155 || (response != null && isReject(response))) {
156 log.info("Resending message {}", controlId);
157 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
158 response = null;
159 }
160
161 if (response != null && isError(response)) {
162 String[] errMsgPath = {"MSA-3"};
163 String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);
164 throw new HL7Exception("Error message received: " + errMsg[0]);
165 }
166
167 } while (response == null && ++retries <= maxRetries);
168 }
169 }
170
171 private void checkValidAckNeededCode(String theCode) throws HL7Exception {
172
173 if ( !(theCode == null || theCode.equals("")
174 ||theCode.equals(AL) || theCode.equals(ER)
175 || theCode.equals(NE) || theCode.equals(SU)) ) {
176 throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
177 }
178 }
179
180
181
182
183
184 private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
185 if (!myThreaded) {
186 cycle(expectingAck);
187 }
188 }
189
190
191
192
193 private void sleepIfNeeded() {
194 if (myThreaded) {
195 try {
196 Thread.sleep(1);
197 } catch (InterruptedException e) { }
198 }
199 }
200
201
202 private static boolean isReject(Transportable theMessage) throws HL7Exception {
203 boolean reject = false;
204 String[] fieldPaths = {"MSA-1"};
205 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
206 if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
207 reject = true;
208 }
209 return reject;
210 }
211
212
213 private static boolean isError(Transportable theMessage) throws HL7Exception {
214 boolean error = false;
215 String[] fieldPaths = {"MSA-1"};
216 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
217 if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
218 error = true;
219 }
220 return error;
221 }
222
223
224
225
226 public synchronized void reserve(String theAckId, long thePeriodMillis) {
227 Long expiry = System.currentTimeMillis() + thePeriodMillis;
228 myReservations.put(theAckId, expiry);
229 }
230
231
232
233
234 private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
235 try {
236 theTransport.send(theTransportable);
237 } catch (TransportException e) {
238 theTransport.disconnect();
239 theTransport.connect();
240 theTransport.send(theTransportable);
241 }
242 }
243
244
245
246
247
248 private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
249 Transportable message;
250 try {
251 message = theTransport.receive();
252 } catch (TransportException e) {
253 theTransport.disconnect();
254 theTransport.connect();
255 message = theTransport.receive();
256 }
257 return message;
258 }
259
260
261
262
263 public void cycle(boolean expectingAck) throws HL7Exception {
264 log.debug("In cycle({})", expectingAck);
265
266 cleanReservations();
267 cleanAcceptAcks();
268 cleanReservedMessages();
269
270 Transportable in;
271 try {
272 if (expectingAck) {
273 in = tryReceive(myContext.getLocallyDrivenTransportLayer());
274 } else {
275 in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
276 }
277 } catch (TransportException e) {
278 try {
279 Thread.sleep(1000);
280 } catch (InterruptedException ignored) {}
281 throw e;
282 }
283
284
285 if (in != null) {
286 log.debug("Received message: {}", in.getMessage());
287 } else {
288 log.debug("Received no message");
289 }
290
291
292 if (in != null) {
293 String acceptAckNeeded = null;
294
295 String ackCode = null;
296 String ackId = null;
297
298 try {
299 String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
300 String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);
301 acceptAckNeeded = fields[0];
302
303 ackCode = fields[2];
304 ackId = fields[3];
305 } catch (HL7Exception e) {
306 log.warn("Failed to parse accept ack fields in incoming message", e);
307 }
308
309 if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
310 long expiryTime = System.currentTimeMillis() + 1000 * 60;
311 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
312 } else {
313 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
314
315 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL))
316 || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable())
317 || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
318 trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());
319 }
320
321 if (ack.isAcceptable()) {
322 if (isReserved(ackId)) {
323
324 log.debug("Received expected ACK message with ACK ID: {}", ackId);
325
326 removeReservation(ackId);
327 long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;
328 myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
329
330 } else {
331
332 log.debug("Sending message to router");
333 Transportable out = myContext.getRouter().processMessage(in);
334 sendAppResponse(out);
335
336 }
337 } else {
338
339
340 log.warn("Incoming message was not acceptable");
341 }
342
343 }
344 } else {
345 String transport = expectingAck ? " Locally driven " : "Remotely driven";
346 log.debug("{} TransportLayer.receive() returned null.", transport);
347 }
348
349 sleepIfNeeded();
350
351 log.debug("Exiting cycle()");
352 }
353
354
355 private void sendAppResponse(final Transportable theResponse) {
356 final ProcessorImpl processor = this;
357 Runnable sender = () -> {
358 try {
359 log.debug("Sending response: {}", theResponse);
360
361
362 processor.send(theResponse, 2, 3000);
363
364 } catch (HL7Exception e) {
365 log.error("Error trying to send response from Application", e);
366 }
367 };
368
369 if (myThreaded) {
370 myResponseExecutorService.execute(sender);
371 } else {
372 sender.run();
373 }
374 }
375
376
377
378
379 private synchronized void cleanReservations() {
380 Iterator<String> it = myReservations.keySet().iterator();
381 while (it.hasNext()) {
382 String ackId = it.next();
383 Long expiry = myReservations.get(ackId);
384 if (System.currentTimeMillis() > expiry) {
385 it.remove();
386 }
387 }
388 }
389
390
391
392
393 private synchronized void cleanAcceptAcks() {
394 Iterator<String> it = myAcceptAcks.keySet().iterator();
395 while (it.hasNext()) {
396 String ackId = it.next();
397 ExpiringTransportable et = myAcceptAcks.get(ackId);
398 if (System.currentTimeMillis() > et.expiryTime) {
399 it.remove();
400 }
401 }
402 }
403
404 private synchronized void cleanReservedMessages() throws HL7Exception {
405 Iterator<String> it = myAvailableMessages.keySet().iterator();
406 while (it.hasNext()) {
407 String ackId = it.next();
408 ExpiringTransportable et = myAvailableMessages.get(ackId);
409 if (System.currentTimeMillis() > et.expiryTime) {
410 it.remove();
411
412
413 Transportable out = myContext.getRouter().processMessage(et.transportable);
414 sendAppResponse(out);
415 }
416 }
417 }
418
419 private synchronized boolean isReserved(String ackId) {
420 boolean reserved = false;
421 if (myReservations.containsKey(ackId)) {
422 reserved = true;
423 }
424 return reserved;
425 }
426
427 private synchronized void removeReservation(String ackId) {
428 myReservations.remove(ackId);
429 }
430
431
432
433
434
435 public boolean isAvailable(String theAckId) {
436 boolean available = false;
437 if (myAvailableMessages.containsKey(theAckId)) {
438 available = true;
439 }
440 return available;
441 }
442
443
444
445
446 public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
447 if (!isReserved(theAckId)) {
448 reserve(theAckId, theTimeoutMillis);
449 }
450
451 Transportable in = null;
452 long until = System.currentTimeMillis() + theTimeoutMillis;
453 do {
454 synchronized (this) {
455 ExpiringTransportable et = myAvailableMessages.get(theAckId);
456 if (et == null) {
457 cycleIfNeeded(false);
458 } else {
459 in = et.transportable;
460 }
461 }
462 sleepIfNeeded();
463 } while (in == null && System.currentTimeMillis() < until);
464 return in;
465 }
466
467
468
469
470 public ProcessorContext getContext() {
471 return myContext;
472 }
473
474
475
476
477
478
479
480 static class ExpiringTransportable {
481 public final Transportable transportable;
482 public final long expiryTime;
483
484 public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
485 transportable = theTransportable;
486 expiryTime = theExpiryTime;
487 }
488 }
489
490
491
492
493
494
495
496 private static class Cycler implements Runnable {
497
498 private final Processor myProcessor;
499 private final boolean myExpectingAck;
500 private boolean isRunning;
501
502
503
504
505
506 public Cycler(Processor theProcessor, boolean isExpectingAck) {
507 myProcessor = theProcessor;
508 myExpectingAck = isExpectingAck;
509 isRunning = true;
510 }
511
512
513
514
515 public void stop() {
516 isRunning = false;
517 }
518
519
520
521
522
523
524
525 public void run() {
526 while (isRunning) {
527 try {
528 myProcessor.cycle(myExpectingAck);
529 } catch (HL7Exception e) {
530 log.error("Error processing message", e);
531 }
532 }
533 }
534 }
535
536 }