View Javadoc
1   /*
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "ProcessorImpl.java".  Description: 
10  "A default implementation of Processor." 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2004.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25  */
26  
27  package ca.uhn.hl7v2.protocol.impl;
28  
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.Map;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import ca.uhn.hl7v2.HL7Exception;
39  import ca.uhn.hl7v2.preparser.PreParser;
40  import ca.uhn.hl7v2.protocol.Processor;
41  import ca.uhn.hl7v2.protocol.ProcessorContext;
42  import ca.uhn.hl7v2.protocol.TransportException;
43  import ca.uhn.hl7v2.protocol.TransportLayer;
44  import ca.uhn.hl7v2.protocol.Transportable;
45  
46  /**
47   * A default implementation of <code>Processor</code>.  
48   *  
49   * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
50   * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
51   */
52  public class ProcessorImpl implements Processor {
53  
54      private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
55  
56      private final ProcessorContext myContext;
57      private final Map<String, ExpiringTransportable> myAcceptAcks;
58      private final Map<String, Long> myReservations;
59      private final Map<String, ExpiringTransportable> myAvailableMessages;
60      private final boolean myThreaded; //true if separate threads are calling cycle()
61      private Cycler ackCycler;
62      private Cycler nonAckCycler;
63      private ExecutorService myResponseExecutorService;
64      
65      /**
66       * @param theContext source of supporting services 
67       * @param isThreaded true if this class should create threads in which to call cycle(), and 
68       *  in which to send responses from Applications.  This is the preferred mode.  Use false 
69       *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
70       *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
71       *  makes potentially blocking calls, so these methods may not return until the next message 
72       *  is received from the remote server, regardless of timeout.  Probably the worst example of this
73       *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
74       *  required on error).  No response will be returned if the message is processed without error, 
75       *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
76       *  study this class carefully.
77       *   
78       * TODO: write a MLLPTransport with non-blocking IO  
79       * TODO: reconnect transport layers on error and retry 
80       */
81      public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
82          myContext = theContext;
83          myThreaded = isThreaded;
84          myAcceptAcks = new HashMap<>();
85          myReservations = new HashMap<>();
86          myAvailableMessages = new HashMap<>();
87          
88          if (isThreaded) {
89              myResponseExecutorService = Executors.newSingleThreadExecutor(); 
90  
91              TransportLayer local = theContext.getLocallyDrivenTransportLayer();
92              TransportLayer remote = theContext.getRemotelyDrivenTransportLayer();
93              
94          	ackCycler = new Cycler(this, true);
95              Thread ackThd = new Thread(ackCycler);
96              ackThd.start();
97              
98              if (local != remote) {
99  	            nonAckCycler = new Cycler(this, false);
100 	            Thread nonAckThd = new Thread(nonAckCycler);
101 	            nonAckThd.start();
102             }
103             
104         }
105     }
106     
107     /**
108      * If self-threaded, stops threads that have been created.  
109      */
110     public void stop() {
111         if (myThreaded) {
112             ackCycler.stop();
113             if (nonAckCycler != null) {
114             	nonAckCycler.stop();
115             }
116             
117             myResponseExecutorService.shutdownNow();
118         }
119     }
120 
121     /**
122      * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
123      */
124     public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
125         String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
126         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
127         String controlId = fields[0];
128         String needAcceptAck = fields[1];
129         String needAppAck = fields[2];
130         
131         checkValidAckNeededCode(needAcceptAck);
132         
133         trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
134         
135         boolean originalMode = (needAcceptAck == null && needAppAck == null); 
136         if (originalMode || !NE.equals(needAcceptAck)) {
137         
138             Transportable response = null;
139             int retries = 0;
140             do {
141                 long until = System.currentTimeMillis() + retryIntervalMillis;
142                 while (response == null && System.currentTimeMillis() < until) {
143                     synchronized (this) {
144                         ExpiringTransportable et = myAcceptAcks.remove(controlId);
145                         if (et == null) {
146                             cycleIfNeeded(true);
147                         } else {
148                             response = et.transportable;
149                         }
150                     }
151                     sleepIfNeeded();
152                 }
153                 
154                 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
155                         || (response != null && isReject(response))) {
156                     log.info("Resending message {}", controlId);
157                     trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
158                     response = null;                    
159                 }
160                 
161                 if (response != null && isError(response)) {
162                     String[] errMsgPath = {"MSA-3"};
163                     String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
164                     throw new HL7Exception("Error message received: " + errMsg[0]);
165                 }
166                 
167             } while (response == null && ++retries <= maxRetries);
168         }
169     }
170     
171     private void checkValidAckNeededCode(String theCode) throws HL7Exception {
172         //must be one of the below ... 
173         if ( !(theCode == null || theCode.equals("") 
174                 ||theCode.equals(AL) || theCode.equals(ER) 
175                 || theCode.equals(NE) || theCode.equals(SU)) ) {
176             throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
177         }            
178     }
179     
180     /**
181      * Calls cycle() if we do not expect another thread to be doing so
182      * @param expectingAck as in cycle
183      */
184     private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
185         if (!myThreaded) {
186             cycle(expectingAck);
187         }        
188     }
189     
190     /**
191      * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
192      */
193     private void sleepIfNeeded() {
194         if (myThreaded) {
195             try {
196                 Thread.sleep(1);
197             } catch (InterruptedException e) { /* no problem */ }
198         }                
199     }
200     
201     /** Returns true if a CR or AR ACK */ 
202     private static boolean isReject(Transportable theMessage) throws HL7Exception {
203         boolean reject = false;
204         String[] fieldPaths = {"MSA-1"};
205         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
206         if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
207             reject = true;
208         }        
209         return reject;
210     }
211 
212     /** Returns true if a CE or AE ACK */ 
213     private static boolean isError(Transportable theMessage) throws HL7Exception {
214         boolean error = false;
215         String[] fieldPaths = {"MSA-1"};
216         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
217         if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
218             error = true;
219         }
220         return error;
221     }
222 
223     /**
224      * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
225      */
226     public synchronized void reserve(String theAckId, long thePeriodMillis) {
227         Long expiry = System.currentTimeMillis() + thePeriodMillis;
228         myReservations.put(theAckId, expiry);
229     }
230     
231     /**
232      * Tries to send the message, and if there is an error reconnects and tries again. 
233      */
234     private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
235         try {
236             theTransport.send(theTransportable);
237         } catch (TransportException e) {
238             theTransport.disconnect();
239             theTransport.connect();
240             theTransport.send(theTransportable);
241         }
242     }
243     
244     
245     /**
246      * Tries to receive a message, and if there is an error reconnects and tries again. 
247      */
248     private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
249         Transportable message;
250         try {
251             message = theTransport.receive();            
252         } catch (TransportException e) {
253             theTransport.disconnect();
254             theTransport.connect();
255             message = theTransport.receive();
256         }
257         return message;
258     }
259 
260     /** 
261      * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
262      */
263     public void cycle(boolean expectingAck) throws HL7Exception {
264         log.debug("In cycle({})", expectingAck);
265     	
266     	cleanReservations();
267         cleanAcceptAcks();
268         cleanReservedMessages();
269 
270         Transportable in;
271         try {
272             if (expectingAck) {
273                 in = tryReceive(myContext.getLocallyDrivenTransportLayer());
274             } else {
275                 in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
276             }
277         } catch (TransportException e) {
278             try {
279                 Thread.sleep(1000);
280             } catch (InterruptedException ignored) {}
281             throw e;
282         }
283         
284         // log
285         if (in != null) {
286                log.debug("Received message: {}", in.getMessage());
287         } else {
288         	log.debug("Received no message");
289         }
290         
291         // If we have a message, handle it
292         if (in != null) { 
293             String acceptAckNeeded = null;
294 //            String appAckNeeded = null;
295             String ackCode = null;
296             String ackId = null;
297             
298             try {
299 	            String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
300 	            String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
301 				acceptAckNeeded = fields[0];
302 //				appAckNeeded = fields[1];
303 				ackCode = fields[2];
304 				ackId = fields[3];
305             } catch (HL7Exception e) {
306             	log.warn("Failed to parse accept ack fields in incoming message", e);
307             }
308             
309             if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
310                 long expiryTime = System.currentTimeMillis() + 1000 * 60;
311                 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
312             } else {
313                 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
314             
315                 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
316                     || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
317                     || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
318                     trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
319                 }
320   
321                 if (ack.isAcceptable()) {
322                     if (isReserved(ackId)) {
323                     	
324                     	log.debug("Received expected ACK message with ACK ID: {}", ackId);
325                     	
326                         removeReservation(ackId);
327                         long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
328                         myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
329                         
330                     } else {
331 
332                     	log.debug("Sending message to router");
333                         Transportable out = myContext.getRouter().processMessage(in);
334                         sendAppResponse(out);
335                         
336                     }
337                 } else {
338                 	// TODO: should we do something more here? Might be nice to 
339                 	// allow a configurable handler for this situation
340                 	log.warn("Incoming message was not acceptable");
341                 }
342                 
343             }
344         } else {
345             String transport = expectingAck ? " Locally driven " : "Remotely driven";
346             log.debug("{} TransportLayer.receive() returned null.", transport);
347         }
348         
349         sleepIfNeeded();
350 
351         log.debug("Exiting cycle()");
352     }
353     
354     /** Sends in a new thread if isThreaded, otherwise in current thread */
355     private void sendAppResponse(final Transportable theResponse) {
356         final ProcessorImpl processor = this;
357         Runnable sender = () -> {
358             try {
359                 log.debug("Sending response: {}", theResponse);
360 
361                 //TODO: make configurable
362                 processor.send(theResponse, 2, 3000);
363 
364             } catch (HL7Exception e) {
365                 log.error("Error trying to send response from Application", e);
366             }
367         };
368         
369         if (myThreaded) {
370             myResponseExecutorService.execute(sender);
371         } else {
372             sender.run();
373         }
374     }
375     
376     /**
377      * Removes expired message reservations from the reservation list.  
378      */
379     private synchronized void cleanReservations() {
380         Iterator<String> it = myReservations.keySet().iterator();
381         while (it.hasNext()) {
382             String ackId = it.next();
383             Long expiry = myReservations.get(ackId);
384             if (System.currentTimeMillis() > expiry) {
385                 it.remove();
386             }
387         }
388     }
389     
390     /**
391      * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
392      */
393     private synchronized void cleanAcceptAcks() {
394         Iterator<String> it = myAcceptAcks.keySet().iterator();
395         while (it.hasNext()) {
396             String ackId = it.next();
397             ExpiringTransportable et = myAcceptAcks.get(ackId);
398             if (System.currentTimeMillis() > et.expiryTime) {
399                 it.remove();
400             }
401         }        
402     }
403     
404     private synchronized void cleanReservedMessages() throws HL7Exception {
405         Iterator<String> it = myAvailableMessages.keySet().iterator();
406         while (it.hasNext()) {
407             String ackId = it.next();            
408             ExpiringTransportable et = myAvailableMessages.get(ackId);
409             if (System.currentTimeMillis() > et.expiryTime) {
410                 it.remove();
411                 
412                 //send to an Application 
413                 Transportable out = myContext.getRouter().processMessage(et.transportable);
414                 sendAppResponse(out);                
415             }
416         }  
417     }
418     
419     private synchronized boolean isReserved(String ackId) {
420         boolean reserved = false;
421         if (myReservations.containsKey(ackId)) {
422             reserved = true;
423         }
424         return reserved;
425     }
426     
427     private synchronized void removeReservation(String ackId) {
428         myReservations.remove(ackId);
429     }
430     
431 
432     /**
433      * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
434      */
435     public boolean isAvailable(String theAckId) {
436         boolean available = false;
437         if (myAvailableMessages.containsKey(theAckId)) {
438             available = true;
439         }
440         return available;
441     }
442 
443     /** 
444      * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
445      */
446     public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
447         if (!isReserved(theAckId)) {
448             reserve(theAckId, theTimeoutMillis);
449         }
450         
451         Transportable in = null;
452         long until = System.currentTimeMillis() + theTimeoutMillis;
453         do {
454             synchronized (this) {
455                 ExpiringTransportable et = myAvailableMessages.get(theAckId);                
456                 if (et == null) {
457                     cycleIfNeeded(false);
458                 } else {
459                     in = et.transportable;
460                 }
461             }
462             sleepIfNeeded();
463         } while (in == null && System.currentTimeMillis() < until);
464         return in;
465     }
466 
467     /** 
468      * @see ca.uhn.hl7v2.protocol.Processor#getContext()
469      */
470     public ProcessorContext getContext() {
471         return myContext;
472     }
473     
474     /**
475      * A struct for Transportable collection entries that time out.  
476      *  
477      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
478      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
479      */
480     static class ExpiringTransportable {
481         public final Transportable transportable;
482         public final long expiryTime;
483         
484         public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
485             transportable = theTransportable;
486             expiryTime = theExpiryTime;
487         }
488     }
489     
490     /**
491      * A Runnable that repeatedly calls the cycle() method of this class.  
492      * 
493      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
494      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
495      */
496     private static class Cycler implements Runnable {
497 
498         private final Processor myProcessor;
499         private final boolean myExpectingAck;
500         private boolean isRunning;
501         
502         /**
503          * @param theProcessor the processor on which to call cycle()
504          * @param isExpectingAck passed to cycle()
505          */
506         public Cycler(Processor theProcessor, boolean isExpectingAck) {
507             myProcessor = theProcessor;
508             myExpectingAck = isExpectingAck;
509             isRunning = true;
510         }
511         
512         /**
513          * Execution will stop at the end of the next cycle.  
514          */
515         public void stop() {
516             isRunning = false;
517         }
518         
519         /** 
520          * Calls cycle() repeatedly on the Processor given in the 
521          * constructor, until stop() is called.  
522          * 
523          * @see java.lang.Runnable#run()
524          */
525         public void run() {
526             while (isRunning) {
527                 try {
528                     myProcessor.cycle(myExpectingAck);
529                 } catch (HL7Exception e) {
530                     log.error("Error processing message", e);
531                 }
532             }
533         }        
534     }
535 
536 }