001/* 002The contents of this file are subject to the Mozilla Public License Version 1.1 003(the "License"); you may not use this file except in compliance with the License. 004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005Software distributed under the License is distributed on an "AS IS" basis, 006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007specific language governing rights and limitations under the License. 008 009The Original Code is "ProcessorImpl.java". Description: 010"A default implementation of Processor." 011 012The Initial Developer of the Original Code is University Health Network. Copyright (C) 0132004. All Rights Reserved. 014 015Contributor(s): ______________________________________. 016 017Alternatively, the contents of this file may be used under the terms of the 018GNU General Public License (the �GPL�), in which case the provisions of the GPL are 019applicable instead of those above. If you wish to allow use of your version of this 020file only under the terms of the GPL and not to allow others to use your version 021of this file under the MPL, indicate your decision by deleting the provisions above 022and replace them with the notice and other provisions required by the GPL License. 023If you do not delete the provisions above, a recipient may use your version of 024this file under either the MPL or the GPL. 025*/ 026 027package ca.uhn.hl7v2.protocol.impl; 028 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.Map; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import ca.uhn.hl7v2.HL7Exception; 039import ca.uhn.hl7v2.preparser.PreParser; 040import ca.uhn.hl7v2.protocol.Processor; 041import ca.uhn.hl7v2.protocol.ProcessorContext; 042import ca.uhn.hl7v2.protocol.TransportException; 043import ca.uhn.hl7v2.protocol.TransportLayer; 044import ca.uhn.hl7v2.protocol.Transportable; 045 046/** 047 * A default implementation of <code>Processor</code>. 048 * 049 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a> 050 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $ 051 */ 052public class ProcessorImpl implements Processor { 053 054 private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class); 055 056 private ProcessorContext myContext; 057 private final Map<String, ExpiringTransportable> myAcceptAcks; 058 private final Map<String, Long> myReservations; 059 private final Map<String, ExpiringTransportable> myAvailableMessages; 060 private boolean myThreaded; //true if separate threads are calling cycle() 061 private Cycler ackCycler; 062 private Cycler nonAckCycler; 063 private ExecutorService myResponseExecutorService; 064 065 /** 066 * @param theContext source of supporting services 067 * @param isThreaded true if this class should create threads in which to call cycle(), and 068 * in which to send responses from Applications. This is the preferred mode. Use false 069 * if threading is not allowed, eg you are running the code in an EJB container. In this case, 070 * the send() and receive() methods will call cycle() themselves as needed. However, cycle() 071 * makes potentially blocking calls, so these methods may not return until the next message 072 * is received from the remote server, regardless of timeout. Probably the worst example of this 073 * would be if receive() was called to wait for an application ACK that was specified as "RE" (ie 074 * required on error). No response will be returned if the message is processed without error, 075 * and in a non-threaded environment, receive() will block forever. Use true if you can, otherwise 076 * study this class carefully. 077 * 078 * TODO: write a MLLPTransport with non-blocking IO 079 * TODO: reconnect transport layers on error and retry 080 */ 081 public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) { 082 myContext = theContext; 083 myThreaded = isThreaded; 084 myAcceptAcks = new HashMap<String, ExpiringTransportable>(); 085 myReservations = new HashMap<String, Long>(); 086 myAvailableMessages = new HashMap<String, ExpiringTransportable>(); 087 088 if (isThreaded) { 089 myResponseExecutorService = Executors.newSingleThreadExecutor(); 090 091 TransportLayer local = theContext.getLocallyDrivenTransportLayer(); 092 TransportLayer remote = theContext.getRemotelyDrivenTransportLayer(); 093 094 ackCycler = new Cycler(this, true); 095 Thread ackThd = new Thread(ackCycler); 096 ackThd.start(); 097 098 if (local != remote) { 099 nonAckCycler = new Cycler(this, false); 100 Thread nonAckThd = new Thread(nonAckCycler); 101 nonAckThd.start(); 102 } 103 104 } 105 } 106 107 /** 108 * If self-threaded, stops threads that have been created. 109 */ 110 public void stop() { 111 if (myThreaded) { 112 ackCycler.stop(); 113 if (nonAckCycler != null) { 114 nonAckCycler.stop(); 115 } 116 117 myResponseExecutorService.shutdownNow(); 118 } 119 } 120 121 /** 122 * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long) 123 */ 124 public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception { 125 String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"}; 126 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths); 127 String controlId = fields[0]; 128 String needAcceptAck = fields[1]; 129 String needAppAck = fields[2]; 130 131 checkValidAckNeededCode(needAcceptAck); 132 133 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage); 134 135 boolean originalMode = (needAcceptAck == null && needAppAck == null); 136 if (originalMode || !NE.equals(needAcceptAck)) { 137 138 Transportable response = null; 139 int retries = 0; 140 do { 141 long until = System.currentTimeMillis() + retryIntervalMillis; 142 while (response == null && System.currentTimeMillis() < until) { 143 synchronized (this) { 144 ExpiringTransportable et = myAcceptAcks.remove(controlId); 145 if (et == null) { 146 cycleIfNeeded(true); 147 } else { 148 response = et.transportable; 149 } 150 } 151 sleepIfNeeded(); 152 } 153 154 if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL)) 155 || (response != null && isReject(response))) { 156 log.info("Resending message {}", controlId); 157 trySend(myContext.getLocallyDrivenTransportLayer(), theMessage); 158 response = null; 159 } 160 161 if (response != null && isError(response)) { 162 String[] errMsgPath = {"MSA-3"}; 163 String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath); 164 throw new HL7Exception("Error message received: " + errMsg[0]); 165 } 166 167 } while (response == null && ++retries <= maxRetries); 168 } 169 } 170 171 private void checkValidAckNeededCode(String theCode) throws HL7Exception { 172 //must be one of the below ... 173 if ( !(theCode == null || theCode.equals("") 174 ||theCode.equals(AL) || theCode.equals(ER) 175 || theCode.equals(NE) || theCode.equals(SU)) ) { 176 throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message"); 177 } 178 } 179 180 /** 181 * Calls cycle() if we do not expect another thread to be doing so 182 * @param expectingAck as in cycle 183 */ 184 private void cycleIfNeeded(boolean expectingAck) throws HL7Exception { 185 if (!myThreaded) { 186 cycle(expectingAck); 187 } 188 } 189 190 /** 191 * Sleeps for 1 ms if externally threaded (this is to let the CPU idle). 192 */ 193 private void sleepIfNeeded() { 194 if (myThreaded) { 195 try { 196 Thread.sleep(1); 197 } catch (InterruptedException e) { /* no problem */ } 198 } 199 } 200 201 /** Returns true if a CR or AR ACK */ 202 private static boolean isReject(Transportable theMessage) throws HL7Exception { 203 boolean reject = false; 204 String[] fieldPaths = {"MSA-1"}; 205 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths); 206 if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) { 207 reject = true; 208 } 209 return reject; 210 } 211 212 /** Returns true if a CE or AE ACK */ 213 private static boolean isError(Transportable theMessage) throws HL7Exception { 214 boolean error = false; 215 String[] fieldPaths = {"MSA-1"}; 216 String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths); 217 if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) { 218 error = true; 219 } 220 return error; 221 } 222 223 /** 224 * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long) 225 */ 226 public synchronized void reserve(String theAckId, long thePeriodMillis) { 227 Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis); 228 myReservations.put(theAckId, expiry); 229 } 230 231 /** 232 * Tries to send the message, and if there is an error reconnects and tries again. 233 */ 234 private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException { 235 try { 236 theTransport.send(theTransportable); 237 } catch (TransportException e) { 238 theTransport.disconnect(); 239 theTransport.connect(); 240 theTransport.send(theTransportable); 241 } 242 } 243 244 245 /** 246 * Tries to receive a message, and if there is an error reconnects and tries again. 247 */ 248 private Transportable tryReceive(TransportLayer theTransport) throws TransportException { 249 Transportable message = null; 250 try { 251 message = theTransport.receive(); 252 } catch (TransportException e) { 253 theTransport.disconnect(); 254 theTransport.connect(); 255 message = theTransport.receive(); 256 } 257 return message; 258 } 259 260 /** 261 * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean) 262 */ 263 public void cycle(boolean expectingAck) throws HL7Exception { 264 log.debug("In cycle({})", expectingAck); 265 266 cleanReservations(); 267 cleanAcceptAcks(); 268 cleanReservedMessages(); 269 270 Transportable in = null; 271 try { 272 if (expectingAck) { 273 in = tryReceive(myContext.getLocallyDrivenTransportLayer()); 274 } else { 275 in = tryReceive(myContext.getRemotelyDrivenTransportLayer()); 276 } 277 } catch (TransportException e) { 278 try { 279 Thread.sleep(1000); 280 } catch (InterruptedException e1) {} 281 throw e; 282 } 283 284 // log 285 if (in != null) { 286 log.debug("Received message: {}", in.getMessage()); 287 } else { 288 log.debug("Received no message"); 289 } 290 291 // If we have a message, handle it 292 if (in != null) { 293 String acceptAckNeeded = null; 294// String appAckNeeded = null; 295 String ackCode = null; 296 String ackId = null; 297 298 try { 299 String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"}; 300 String[] fields = PreParser.getFields(in.getMessage(), fieldPaths); 301 acceptAckNeeded = fields[0]; 302// appAckNeeded = fields[1]; 303 ackCode = fields[2]; 304 ackId = fields[3]; 305 } catch (HL7Exception e) { 306 log.warn("Failed to parse accept ack fields in incoming message", e); 307 } 308 309 if (ackId != null && ackCode != null && ackCode.startsWith("C")) { 310 long expiryTime = System.currentTimeMillis() + 1000 * 60; 311 myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime)); 312 } else { 313 AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in); 314 315 if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 316 || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 317 || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) { 318 trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage()); 319 } 320 321 if (ack.isAcceptable()) { 322 if (isReserved(ackId)) { 323 324 log.debug("Received expected ACK message with ACK ID: {}", ackId); 325 326 removeReservation(ackId); 327 long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5; 328 myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime)); 329 330 } else { 331 332 log.debug("Sending message to router"); 333 Transportable out = myContext.getRouter().processMessage(in); 334 sendAppResponse(out); 335 336 } 337 } else { 338 // TODO: should we do something more here? Might be nice to 339 // allow a configurable handler for this situation 340 log.warn("Incoming message was not acceptable"); 341 } 342 343 } 344 } else { 345 String transport = expectingAck ? " Locally driven " : "Remotely driven"; 346 log.debug("{} TransportLayer.receive() returned null.", transport); 347 } 348 349 sleepIfNeeded(); 350 351 log.debug("Exiting cycle()"); 352 } 353 354 /** Sends in a new thread if isThreaded, otherwise in current thread */ 355 private void sendAppResponse(final Transportable theResponse) { 356 final ProcessorImpl processor = this; 357 Runnable sender = new Runnable() { 358 public void run() { 359 try { 360 log.debug("Sending response: {}", theResponse); 361 362 //TODO: make configurable 363 processor.send(theResponse, 2, 3000); 364 365 } catch (HL7Exception e) { 366 log.error("Error trying to send response from Application", e); 367 } 368 } 369 }; 370 371 if (myThreaded) { 372 myResponseExecutorService.execute(sender); 373 } else { 374 sender.run(); 375 } 376 } 377 378 /** 379 * Removes expired message reservations from the reservation list. 380 */ 381 private synchronized void cleanReservations() { 382 Iterator<String> it = myReservations.keySet().iterator(); 383 while (it.hasNext()) { 384 String ackId = it.next(); 385 Long expiry = myReservations.get(ackId); 386 if (System.currentTimeMillis() > expiry.longValue()) { 387 it.remove(); 388 } 389 } 390 } 391 392 /** 393 * Discards expired accept acknowledgements (these are used in retry protocol; see send()). 394 */ 395 private synchronized void cleanAcceptAcks() { 396 Iterator<String> it = myAcceptAcks.keySet().iterator(); 397 while (it.hasNext()) { 398 String ackId = it.next(); 399 ExpiringTransportable et = myAcceptAcks.get(ackId); 400 if (System.currentTimeMillis() > et.expiryTime) { 401 it.remove(); 402 } 403 } 404 } 405 406 private synchronized void cleanReservedMessages() throws HL7Exception { 407 Iterator<String> it = myAvailableMessages.keySet().iterator(); 408 while (it.hasNext()) { 409 String ackId = it.next(); 410 ExpiringTransportable et = myAvailableMessages.get(ackId); 411 if (System.currentTimeMillis() > et.expiryTime) { 412 it.remove(); 413 414 //send to an Application 415 Transportable out = myContext.getRouter().processMessage(et.transportable); 416 sendAppResponse(out); 417 } 418 } 419 } 420 421 private synchronized boolean isReserved(String ackId) { 422 boolean reserved = false; 423 if (myReservations.containsKey(ackId)) { 424 reserved = true; 425 } 426 return reserved; 427 } 428 429 private synchronized void removeReservation(String ackId) { 430 myReservations.remove(ackId); 431 } 432 433 434 /** 435 * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String) 436 */ 437 public boolean isAvailable(String theAckId) { 438 boolean available = false; 439 if (myAvailableMessages.containsKey(theAckId)) { 440 available = true; 441 } 442 return available; 443 } 444 445 /** 446 * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long) 447 */ 448 public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception { 449 if (!isReserved(theAckId)) { 450 reserve(theAckId, theTimeoutMillis); 451 } 452 453 Transportable in = null; 454 long until = System.currentTimeMillis() + theTimeoutMillis; 455 do { 456 synchronized (this) { 457 ExpiringTransportable et = myAvailableMessages.get(theAckId); 458 if (et == null) { 459 cycleIfNeeded(false); 460 } else { 461 in = et.transportable; 462 } 463 } 464 sleepIfNeeded(); 465 } while (in == null && System.currentTimeMillis() < until); 466 return in; 467 } 468 469 /** 470 * @see ca.uhn.hl7v2.protocol.Processor#getContext() 471 */ 472 public ProcessorContext getContext() { 473 return myContext; 474 } 475 476 /** 477 * A struct for Transportable collection entries that time out. 478 * 479 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a> 480 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $ 481 */ 482 class ExpiringTransportable { 483 public Transportable transportable; 484 public long expiryTime; 485 486 public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) { 487 transportable = theTransportable; 488 expiryTime = theExpiryTime; 489 } 490 } 491 492 /** 493 * A Runnable that repeatedly calls the cycle() method of this class. 494 * 495 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a> 496 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $ 497 */ 498 private static class Cycler implements Runnable { 499 500 private Processor myProcessor; 501 private boolean myExpectingAck; 502 private boolean isRunning; 503 504 /** 505 * @param theProcessor the processor on which to call cycle() 506 * @param isExpectingAck passed to cycle() 507 */ 508 public Cycler(Processor theProcessor, boolean isExpectingAck) { 509 myProcessor = theProcessor; 510 myExpectingAck = isExpectingAck; 511 isRunning = true; 512 } 513 514 /** 515 * Execution will stop at the end of the next cycle. 516 */ 517 public void stop() { 518 isRunning = false; 519 } 520 521 /** 522 * Calls cycle() repeatedly on the Processor given in the 523 * constructor, until stop() is called. 524 * 525 * @see java.lang.Runnable#run() 526 */ 527 public void run() { 528 while (isRunning) { 529 try { 530 myProcessor.cycle(myExpectingAck); 531 } catch (HL7Exception e) { 532 log.error("Error processing message", e); 533 } 534 } 535 } 536 } 537 538}