Coverage Report - ca.uhn.hl7v2.protocol.impl.ProcessorImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
ProcessorImpl
88%
186/210
72%
93/128
3.885
ProcessorImpl$1
71%
5/7
N/A
3.885
ProcessorImpl$Cycler
100%
13/13
100%
2/2
3.885
ProcessorImpl$ExpiringTransportable
100%
4/4
N/A
3.885
 
 1  
 /*
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "ProcessorImpl.java".  Description: 
 10  
 "A default implementation of Processor." 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2004.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): ______________________________________. 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
 */
 26  
 
 27  
 package ca.uhn.hl7v2.protocol.impl;
 28  
 
 29  
 import java.util.HashMap;
 30  
 import java.util.Iterator;
 31  
 import java.util.Map;
 32  
 import java.util.concurrent.ExecutorService;
 33  
 import java.util.concurrent.Executors;
 34  
 
 35  
 import org.slf4j.Logger;
 36  
 import org.slf4j.LoggerFactory;
 37  
 
 38  
 import ca.uhn.hl7v2.HL7Exception;
 39  
 import ca.uhn.hl7v2.preparser.PreParser;
 40  
 import ca.uhn.hl7v2.protocol.Processor;
 41  
 import ca.uhn.hl7v2.protocol.ProcessorContext;
 42  
 import ca.uhn.hl7v2.protocol.TransportException;
 43  
 import ca.uhn.hl7v2.protocol.TransportLayer;
 44  
 import ca.uhn.hl7v2.protocol.Transportable;
 45  
 
 46  
 /**
 47  
  * A default implementation of <code>Processor</code>.  
 48  
  *  
 49  
  * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
 50  
  * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
 51  
  */
 52  25
 public class ProcessorImpl implements Processor {
 53  
 
 54  5
     private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
 55  
 
 56  
     private ProcessorContext myContext;
 57  
     private final Map<String, ExpiringTransportable> myAcceptAcks;
 58  
     private final Map<String, Long> myReservations;
 59  
     private final Map<String, ExpiringTransportable> myAvailableMessages;
 60  
     private boolean myThreaded; //true if separate threads are calling cycle()  
 61  
     private Cycler ackCycler;
 62  
     private Cycler nonAckCycler;
 63  
     private ExecutorService myResponseExecutorService;
 64  
     
 65  
     /**
 66  
      * @param theContext source of supporting services 
 67  
      * @param isThreaded true if this class should create threads in which to call cycle(), and 
 68  
      *  in which to send responses from Applications.  This is the preferred mode.  Use false 
 69  
      *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
 70  
      *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
 71  
      *  makes potentially blocking calls, so these methods may not return until the next message 
 72  
      *  is received from the remote server, regardless of timeout.  Probably the worst example of this
 73  
      *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
 74  
      *  required on error).  No response will be returned if the message is processed without error, 
 75  
      *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
 76  
      *  study this class carefully.
 77  
      *   
 78  
      * TODO: write a MLLPTransport with non-blocking IO  
 79  
      * TODO: reconnect transport layers on error and retry 
 80  
      */
 81  70
     public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
 82  70
         myContext = theContext;
 83  70
         myThreaded = isThreaded;
 84  70
         myAcceptAcks = new HashMap<String, ExpiringTransportable>();
 85  70
         myReservations = new HashMap<String, Long>();
 86  70
         myAvailableMessages = new HashMap<String, ExpiringTransportable>();
 87  
         
 88  70
         if (isThreaded) {
 89  70
             myResponseExecutorService = Executors.newSingleThreadExecutor(); 
 90  
 
 91  70
             TransportLayer local = theContext.getLocallyDrivenTransportLayer();
 92  70
             TransportLayer remote = theContext.getRemotelyDrivenTransportLayer();
 93  
             
 94  70
                 ackCycler = new Cycler(this, true);
 95  70
             Thread ackThd = new Thread(ackCycler);
 96  70
             ackThd.start();
 97  
             
 98  70
             if (local != remote) {
 99  60
                     nonAckCycler = new Cycler(this, false);
 100  60
                     Thread nonAckThd = new Thread(nonAckCycler);
 101  60
                     nonAckThd.start();
 102  
             }
 103  
             
 104  
         }
 105  70
     }
 106  
     
 107  
     /**
 108  
      * If self-threaded, stops threads that have been created.  
 109  
      */
 110  
     public void stop() {
 111  65
         if (myThreaded) {
 112  65
             ackCycler.stop();
 113  65
             if (nonAckCycler != null) {
 114  60
                     nonAckCycler.stop();
 115  
             }
 116  
             
 117  65
             myResponseExecutorService.shutdownNow();
 118  
         }
 119  65
     }
 120  
 
 121  
     /**
 122  
      * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
 123  
      */
 124  
     public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
 125  60
         String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
 126  60
         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
 127  60
         String controlId = fields[0];
 128  60
         String needAcceptAck = fields[1];
 129  60
         String needAppAck = fields[2];
 130  
         
 131  60
         checkValidAckNeededCode(needAcceptAck);
 132  
         
 133  60
         trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
 134  
         
 135  60
         boolean originalMode = (needAcceptAck == null && needAppAck == null); 
 136  60
         if (originalMode || !NE.equals(needAcceptAck)) {
 137  
         
 138  50
             Transportable response = null;
 139  50
             int retries = 0;
 140  
             do {
 141  115
                 long until = System.currentTimeMillis() + retryIntervalMillis;
 142  199625
                 while (response == null && System.currentTimeMillis() < until) {
 143  199510
                     synchronized (this) {
 144  199510
                         ExpiringTransportable et = myAcceptAcks.remove(controlId);
 145  199510
                         if (et == null) {
 146  199480
                             cycleIfNeeded(true);
 147  
                         } else {
 148  30
                             response = et.transportable;
 149  
                         }
 150  199510
                     }
 151  199510
                     sleepIfNeeded();
 152  
                 }
 153  
                 
 154  115
                 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
 155  30
                         || (response != null && isReject(response))) {
 156  10
                     log.info("Resending message {}", controlId);
 157  10
                     trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
 158  10
                     response = null;                    
 159  
                 }
 160  
                 
 161  115
                 if (response != null && isError(response)) {
 162  10
                     String[] errMsgPath = {"MSA-3"};
 163  10
                     String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
 164  10
                     throw new HL7Exception("Error message received: " + errMsg[0]);
 165  
                 }
 166  
                 
 167  105
             } while (response == null && ++retries <= maxRetries);
 168  
         }
 169  50
     }
 170  
     
 171  
     private void checkValidAckNeededCode(String theCode) throws HL7Exception {
 172  
         //must be one of the below ... 
 173  60
         if ( !(theCode == null || theCode.equals("") 
 174  40
                 ||theCode.equals(AL) || theCode.equals(ER) 
 175  10
                 || theCode.equals(NE) || theCode.equals(SU)) ) {
 176  0
             throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
 177  
         }            
 178  60
     }
 179  
     
 180  
     /**
 181  
      * Calls cycle() if we do not expect another thread to be doing so
 182  
      * @param expectingAck as in cycle
 183  
      */
 184  
     private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
 185  228256
         if (!myThreaded) {
 186  0
             cycle(expectingAck);
 187  
         }        
 188  228256
     }
 189  
     
 190  
     /**
 191  
      * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
 192  
      */
 193  
     private void sleepIfNeeded() {
 194  229822
         if (myThreaded) {
 195  
             try {
 196  229822
                 Thread.sleep(1);
 197  229812
             } catch (InterruptedException e) { /* no problem */ }
 198  
         }                
 199  229822
     }
 200  
     
 201  
     /** Returns true if a CR or AR ACK */ 
 202  
     private static boolean isReject(Transportable theMessage) throws HL7Exception {
 203  30
         boolean reject = false;
 204  30
         String[] fieldPaths = {"MSA-1"};
 205  30
         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
 206  30
         if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
 207  5
             reject = true;
 208  
         }        
 209  30
         return reject;
 210  
     }
 211  
 
 212  
     /** Returns true if a CE or AE ACK */ 
 213  
     private static boolean isError(Transportable theMessage) throws HL7Exception {
 214  25
         boolean error = false;
 215  25
         String[] fieldPaths = {"MSA-1"};
 216  25
         String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
 217  25
         if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
 218  10
             error = true;
 219  
         }
 220  25
         return error;
 221  
     }
 222  
 
 223  
     /**
 224  
      * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
 225  
      */
 226  
     public synchronized void reserve(String theAckId, long thePeriodMillis) {
 227  20
         Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
 228  20
         myReservations.put(theAckId, expiry);
 229  20
     }
 230  
     
 231  
     /**
 232  
      * Tries to send the message, and if there is an error reconnects and tries again. 
 233  
      */
 234  
     private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
 235  
         try {
 236  70
             theTransport.send(theTransportable);
 237  0
         } catch (TransportException e) {
 238  0
             theTransport.disconnect();
 239  0
             theTransport.connect();
 240  0
             theTransport.send(theTransportable);
 241  70
         }
 242  70
     }
 243  
     
 244  
     
 245  
     /**
 246  
      * Tries to receive a message, and if there is an error reconnects and tries again. 
 247  
      */
 248  
     private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
 249  1546
         Transportable message = null;
 250  
         try {
 251  1546
             message = theTransport.receive();            
 252  70
         } catch (TransportException e) {
 253  70
             theTransport.disconnect();
 254  70
             theTransport.connect();
 255  60
             message = theTransport.receive();
 256  1476
         }
 257  1536
         return message;
 258  
     }
 259  
 
 260  
     /** 
 261  
      * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
 262  
      */
 263  
     public void cycle(boolean expectingAck) throws HL7Exception {
 264  1546
         log.debug("In cycle({})", expectingAck);
 265  
             
 266  1546
             cleanReservations();
 267  1546
         cleanAcceptAcks();
 268  1546
         cleanReservedMessages();
 269  
 
 270  1546
         Transportable in = null;
 271  
         try {
 272  1546
             if (expectingAck) {
 273  833
                 in = tryReceive(myContext.getLocallyDrivenTransportLayer());
 274  
             } else {
 275  713
                 in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
 276  
             }
 277  0
         } catch (TransportException e) {
 278  
             try {
 279  0
                 Thread.sleep(1000);
 280  0
             } catch (InterruptedException e1) {}
 281  0
             throw e;
 282  1536
         }
 283  
         
 284  
         // log
 285  1536
         if (in != null) {
 286  60
                log.debug("Received message: {}", in.getMessage());
 287  
         } else {
 288  1476
                 log.debug("Received no message");
 289  
         }
 290  
         
 291  
         // If we have a message, handle it
 292  1536
         if (in != null) { 
 293  60
             String acceptAckNeeded = null;
 294  
 //            String appAckNeeded = null;
 295  60
             String ackCode = null;
 296  60
             String ackId = null;
 297  
             
 298  
             try {
 299  60
                     String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
 300  60
                     String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
 301  55
                                 acceptAckNeeded = fields[0];
 302  
 //                                appAckNeeded = fields[1];
 303  55
                                 ackCode = fields[2];
 304  55
                                 ackId = fields[3];
 305  5
             } catch (HL7Exception e) {
 306  5
                     log.warn("Failed to parse accept ack fields in incoming message", e);
 307  55
             }
 308  
             
 309  60
             if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
 310  30
                 long expiryTime = System.currentTimeMillis() + 1000 * 60;
 311  30
                 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
 312  30
             } else {
 313  30
                 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
 314  
             
 315  25
                 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
 316  0
                     || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
 317  0
                     || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
 318  0
                     trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
 319  
                 }
 320  
   
 321  25
                 if (ack.isAcceptable()) {
 322  25
                     if (isReserved(ackId)) {
 323  
                             
 324  5
                             log.debug("Received expected ACK message with ACK ID: {}", ackId);
 325  
                             
 326  5
                         removeReservation(ackId);
 327  5
                         long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
 328  5
                         myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
 329  
                         
 330  5
                     } else {
 331  
 
 332  20
                             log.debug("Sending message to router");
 333  20
                         Transportable out = myContext.getRouter().processMessage(in);
 334  20
                         sendAppResponse(out);
 335  
                         
 336  20
                     }
 337  
                 } else {
 338  
                         // TODO: should we do something more here? Might be nice to 
 339  
                         // allow a configurable handler for this situation
 340  0
                         log.warn("Incoming message was not acceptable");
 341  
                 }
 342  
                 
 343  
             }
 344  55
         } else {
 345  1476
             String transport = expectingAck ? " Locally driven " : "Remotely driven";
 346  1476
             log.debug("{} TransportLayer.receive() returned null.", transport);
 347  
         }
 348  
         
 349  1531
         sleepIfNeeded();
 350  
 
 351  1531
         log.debug("Exiting cycle()");
 352  1531
     }
 353  
     
 354  
     /** Sends in a new thread if isThreaded, otherwise in current thread */
 355  
     private void sendAppResponse(final Transportable theResponse) {
 356  20
         final ProcessorImpl processor = this;
 357  20
         Runnable sender = new Runnable() {
 358  
             public void run() {
 359  
                 try {
 360  20
                         log.debug("Sending response: {}", theResponse);
 361  
                         
 362  
                     //TODO: make configurable 
 363  20
                         processor.send(theResponse, 2, 3000);
 364  
                         
 365  0
                 } catch (HL7Exception e) {
 366  0
                     log.error("Error trying to send response from Application", e);
 367  20
                 }
 368  20
             }
 369  
         };
 370  
         
 371  20
         if (myThreaded) {
 372  20
             myResponseExecutorService.execute(sender);
 373  
         } else {
 374  0
             sender.run();
 375  
         }
 376  20
     }
 377  
     
 378  
     /**
 379  
      * Removes expired message reservations from the reservation list.  
 380  
      */
 381  
     private synchronized void cleanReservations() {
 382  1546
         Iterator<String> it = myReservations.keySet().iterator();
 383  2153
         while (it.hasNext()) {
 384  607
             String ackId = it.next();
 385  607
             Long expiry = myReservations.get(ackId);
 386  607
             if (System.currentTimeMillis() > expiry.longValue()) {
 387  0
                 it.remove();
 388  
             }
 389  607
         }
 390  1546
     }
 391  
     
 392  
     /**
 393  
      * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
 394  
      */
 395  
     private synchronized void cleanAcceptAcks() {
 396  1546
         Iterator<String> it = myAcceptAcks.keySet().iterator();
 397  1554
         while (it.hasNext()) {
 398  8
             String ackId = it.next();
 399  8
             ExpiringTransportable et = myAcceptAcks.get(ackId);
 400  8
             if (System.currentTimeMillis() > et.expiryTime) {
 401  0
                 it.remove();
 402  
             }
 403  8
         }        
 404  1546
     }
 405  
     
 406  
     private synchronized void cleanReservedMessages() throws HL7Exception {
 407  1546
         Iterator<String> it = myAvailableMessages.keySet().iterator();
 408  1551
         while (it.hasNext()) {
 409  5
             String ackId = it.next();            
 410  5
             ExpiringTransportable et = myAvailableMessages.get(ackId);
 411  5
             if (System.currentTimeMillis() > et.expiryTime) {
 412  0
                 it.remove();
 413  
                 
 414  
                 //send to an Application 
 415  0
                 Transportable out = myContext.getRouter().processMessage(et.transportable);
 416  0
                 sendAppResponse(out);                
 417  
             }
 418  5
         }  
 419  1546
     }
 420  
     
 421  
     private synchronized boolean isReserved(String ackId) {
 422  45
         boolean reserved = false;
 423  45
         if (myReservations.containsKey(ackId)) {
 424  5
             reserved = true;
 425  
         }
 426  45
         return reserved;
 427  
     }
 428  
     
 429  
     private synchronized void removeReservation(String ackId) {
 430  5
         myReservations.remove(ackId);
 431  5
     }
 432  
     
 433  
 
 434  
     /**
 435  
      * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
 436  
      */
 437  
     public boolean isAvailable(String theAckId) {
 438  0
         boolean available = false;
 439  0
         if (myAvailableMessages.containsKey(theAckId)) {
 440  0
             available = true;
 441  
         }
 442  0
         return available;
 443  
     }
 444  
 
 445  
     /** 
 446  
      * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
 447  
      */
 448  
     public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
 449  20
         if (!isReserved(theAckId)) {
 450  20
             reserve(theAckId, theTimeoutMillis);
 451  
         }
 452  
         
 453  20
         Transportable in = null;
 454  20
         long until = System.currentTimeMillis() + theTimeoutMillis;
 455  
         do {
 456  28781
             synchronized (this) {
 457  28781
                 ExpiringTransportable et = myAvailableMessages.get(theAckId);                
 458  28781
                 if (et == null) {
 459  28776
                     cycleIfNeeded(false);
 460  
                 } else {
 461  5
                     in = et.transportable;
 462  
                 }
 463  28781
             }
 464  28781
             sleepIfNeeded();
 465  28781
         } while (in == null && System.currentTimeMillis() < until);
 466  20
         return in;
 467  
     }
 468  
 
 469  
     /** 
 470  
      * @see ca.uhn.hl7v2.protocol.Processor#getContext()
 471  
      */
 472  
     public ProcessorContext getContext() {
 473  45
         return myContext;
 474  
     }
 475  
     
 476  
     /**
 477  
      * A struct for Transportable collection entries that time out.  
 478  
      *  
 479  
      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
 480  
      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
 481  
      */
 482  
     class ExpiringTransportable {
 483  
         public Transportable transportable;
 484  
         public long expiryTime;
 485  
         
 486  35
         public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
 487  35
             transportable = theTransportable;
 488  35
             expiryTime = theExpiryTime;
 489  35
         }
 490  
     }
 491  
     
 492  
     /**
 493  
      * A Runnable that repeatedly calls the cycle() method of this class.  
 494  
      * 
 495  
      * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
 496  
      * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
 497  
      */
 498  
     private static class Cycler implements Runnable {
 499  
 
 500  
         private Processor myProcessor;
 501  
         private boolean myExpectingAck;
 502  
         private boolean isRunning;
 503  
         
 504  
         /**
 505  
          * @param theProcessor the processor on which to call cycle()
 506  
          * @param isExpectingAck passed to cycle()
 507  
          */
 508  130
         public Cycler(Processor theProcessor, boolean isExpectingAck) {
 509  130
             myProcessor = theProcessor;
 510  130
             myExpectingAck = isExpectingAck;
 511  130
             isRunning = true;
 512  130
         }
 513  
         
 514  
         /**
 515  
          * Execution will stop at the end of the next cycle.  
 516  
          */
 517  
         public void stop() {
 518  125
             isRunning = false;
 519  125
         }
 520  
         
 521  
         /** 
 522  
          * Calls cycle() repeatedly on the Processor given in the 
 523  
          * constructor, until stop() is called.  
 524  
          * 
 525  
          * @see java.lang.Runnable#run()
 526  
          */
 527  
         public void run() {
 528  1666
             while (isRunning) {
 529  
                 try {
 530  1546
                     myProcessor.cycle(myExpectingAck);
 531  5
                 } catch (HL7Exception e) {
 532  5
                     log.error("Error processing message", e);
 533  1536
                 }
 534  
             }
 535  120
         }        
 536  
     }
 537  
 
 538  
 }