001/*
002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
008
009The Original Code is "ProcessorImpl.java".  Description: 
010"A default implementation of Processor." 
011
012The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0132004.  All Rights Reserved. 
014
015Contributor(s): ______________________________________. 
016
017Alternatively, the contents of this file may be used under the terms of the 
018GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019applicable instead of those above.  If you wish to allow use of your version of this 
020file only under the terms of the GPL and not to allow others to use your version 
021of this file under the MPL, indicate your decision by deleting  the provisions above 
022and replace  them with the notice and other provisions required by the GPL License.  
023If you do not delete the provisions above, a recipient may use your version of 
024this file under either the MPL or the GPL. 
025*/
026
027package ca.uhn.hl7v2.protocol.impl;
028
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.Map;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import ca.uhn.hl7v2.HL7Exception;
039import ca.uhn.hl7v2.preparser.PreParser;
040import ca.uhn.hl7v2.protocol.Processor;
041import ca.uhn.hl7v2.protocol.ProcessorContext;
042import ca.uhn.hl7v2.protocol.TransportException;
043import ca.uhn.hl7v2.protocol.TransportLayer;
044import ca.uhn.hl7v2.protocol.Transportable;
045
046/**
047 * A default implementation of <code>Processor</code>.  
048 *  
049 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
050 * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
051 */
052public class ProcessorImpl implements Processor {
053
054    private static final Logger log = LoggerFactory.getLogger(ProcessorImpl.class);
055
056    private ProcessorContext myContext;
057    private final Map<String, ExpiringTransportable> myAcceptAcks;
058    private final Map<String, Long> myReservations;
059    private final Map<String, ExpiringTransportable> myAvailableMessages;
060    private boolean myThreaded; //true if separate threads are calling cycle()  
061    private Cycler ackCycler;
062    private Cycler nonAckCycler;
063    private ExecutorService myResponseExecutorService;
064    
065    /**
066     * @param theContext source of supporting services 
067     * @param isThreaded true if this class should create threads in which to call cycle(), and 
068     *  in which to send responses from Applications.  This is the preferred mode.  Use false 
069     *  if threading is not allowed, eg you are running the code in an EJB container.  In this case, 
070     *  the send() and receive() methods will call cycle() themselves as needed.  However, cycle() 
071     *  makes potentially blocking calls, so these methods may not return until the next message 
072     *  is received from the remote server, regardless of timeout.  Probably the worst example of this
073     *  would be if receive() was called to wait for an application ACK that was specified as "RE" (ie
074     *  required on error).  No response will be returned if the message is processed without error, 
075     *  and in a non-threaded environment, receive() will block forever.  Use true if you can, otherwise
076     *  study this class carefully.
077     *   
078     * TODO: write a MLLPTransport with non-blocking IO  
079     * TODO: reconnect transport layers on error and retry 
080     */
081    public ProcessorImpl(ProcessorContext theContext, boolean isThreaded) {
082        myContext = theContext;
083        myThreaded = isThreaded;
084        myAcceptAcks = new HashMap<String, ExpiringTransportable>();
085        myReservations = new HashMap<String, Long>();
086        myAvailableMessages = new HashMap<String, ExpiringTransportable>();
087        
088        if (isThreaded) {
089            myResponseExecutorService = Executors.newSingleThreadExecutor(); 
090
091            TransportLayer local = theContext.getLocallyDrivenTransportLayer();
092            TransportLayer remote = theContext.getRemotelyDrivenTransportLayer();
093            
094                ackCycler = new Cycler(this, true);
095            Thread ackThd = new Thread(ackCycler);
096            ackThd.start();
097            
098            if (local != remote) {
099                    nonAckCycler = new Cycler(this, false);
100                    Thread nonAckThd = new Thread(nonAckCycler);
101                    nonAckThd.start();
102            }
103            
104        }
105    }
106    
107    /**
108     * If self-threaded, stops threads that have been created.  
109     */
110    public void stop() {
111        if (myThreaded) {
112            ackCycler.stop();
113            if (nonAckCycler != null) {
114                nonAckCycler.stop();
115            }
116            
117            myResponseExecutorService.shutdownNow();
118        }
119    }
120
121    /**
122     * @see ca.uhn.hl7v2.protocol.Processor#send(ca.uhn.hl7v2.protocol.Transportable, int, long)
123     */
124    public void send(Transportable theMessage, int maxRetries, long retryIntervalMillis) throws HL7Exception {
125        String[] fieldPaths = {"MSH-10", "MSH-15", "MSH-16"};
126        String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
127        String controlId = fields[0];
128        String needAcceptAck = fields[1];
129        String needAppAck = fields[2];
130        
131        checkValidAckNeededCode(needAcceptAck);
132        
133        trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
134        
135        boolean originalMode = (needAcceptAck == null && needAppAck == null); 
136        if (originalMode || !NE.equals(needAcceptAck)) {
137        
138            Transportable response = null;
139            int retries = 0;
140            do {
141                long until = System.currentTimeMillis() + retryIntervalMillis;
142                while (response == null && System.currentTimeMillis() < until) {
143                    synchronized (this) {
144                        ExpiringTransportable et = myAcceptAcks.remove(controlId);
145                        if (et == null) {
146                            cycleIfNeeded(true);
147                        } else {
148                            response = et.transportable;
149                        }
150                    }
151                    sleepIfNeeded();
152                }
153                
154                if ((response == null && needAcceptAck != null && needAcceptAck.equals(AL))
155                        || (response != null && isReject(response))) {
156                    log.info("Resending message {}", controlId);
157                    trySend(myContext.getLocallyDrivenTransportLayer(), theMessage);
158                    response = null;                    
159                }
160                
161                if (response != null && isError(response)) {
162                    String[] errMsgPath = {"MSA-3"};
163                    String[] errMsg = PreParser.getFields(response.getMessage(), errMsgPath);                    
164                    throw new HL7Exception("Error message received: " + errMsg[0]);
165                }
166                
167            } while (response == null && ++retries <= maxRetries);
168        }
169    }
170    
171    private void checkValidAckNeededCode(String theCode) throws HL7Exception {
172        //must be one of the below ... 
173        if ( !(theCode == null || theCode.equals("") 
174                ||theCode.equals(AL) || theCode.equals(ER) 
175                || theCode.equals(NE) || theCode.equals(SU)) ) {
176            throw new HL7Exception("MSH-15 must be AL, ER, NE, or SU in the outgoing message");
177        }            
178    }
179    
180    /**
181     * Calls cycle() if we do not expect another thread to be doing so
182     * @param expectingAck as in cycle
183     */
184    private void cycleIfNeeded(boolean expectingAck) throws HL7Exception {
185        if (!myThreaded) {
186            cycle(expectingAck);
187        }        
188    }
189    
190    /**
191     * Sleeps for 1 ms if externally threaded (this is to let the CPU idle).   
192     */
193    private void sleepIfNeeded() {
194        if (myThreaded) {
195            try {
196                Thread.sleep(1);
197            } catch (InterruptedException e) { /* no problem */ }
198        }                
199    }
200    
201    /** Returns true if a CR or AR ACK */ 
202    private static boolean isReject(Transportable theMessage) throws HL7Exception {
203        boolean reject = false;
204        String[] fieldPaths = {"MSA-1"};
205        String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
206        if (fields[0] != null && (fields[0].equals(CR) || fields[0].equals(AR))) {
207            reject = true;
208        }        
209        return reject;
210    }
211
212    /** Returns true if a CE or AE ACK */ 
213    private static boolean isError(Transportable theMessage) throws HL7Exception {
214        boolean error = false;
215        String[] fieldPaths = {"MSA-1"};
216        String[] fields = PreParser.getFields(theMessage.getMessage(), fieldPaths);
217        if (fields[0] != null && (fields[0].equals(CE) || fields[0].equals(AE))) {
218            error = true;
219        }
220        return error;
221    }
222
223    /**
224     * @see ca.uhn.hl7v2.protocol.Processor#reserve(java.lang.String, long)
225     */
226    public synchronized void reserve(String theAckId, long thePeriodMillis) {
227        Long expiry = new Long(System.currentTimeMillis() + thePeriodMillis);
228        myReservations.put(theAckId, expiry);
229    }
230    
231    /**
232     * Tries to send the message, and if there is an error reconnects and tries again. 
233     */
234    private void trySend(TransportLayer theTransport, Transportable theTransportable) throws TransportException {
235        try {
236            theTransport.send(theTransportable);
237        } catch (TransportException e) {
238            theTransport.disconnect();
239            theTransport.connect();
240            theTransport.send(theTransportable);
241        }
242    }
243    
244    
245    /**
246     * Tries to receive a message, and if there is an error reconnects and tries again. 
247     */
248    private Transportable tryReceive(TransportLayer theTransport) throws TransportException {
249        Transportable message = null;
250        try {
251            message = theTransport.receive();            
252        } catch (TransportException e) {
253            theTransport.disconnect();
254            theTransport.connect();
255            message = theTransport.receive();
256        }
257        return message;
258    }
259
260    /** 
261     * @see ca.uhn.hl7v2.protocol.Processor#cycle(boolean)
262     */
263    public void cycle(boolean expectingAck) throws HL7Exception {
264        log.debug("In cycle({})", expectingAck);
265        
266        cleanReservations();
267        cleanAcceptAcks();
268        cleanReservedMessages();
269
270        Transportable in = null;
271        try {
272            if (expectingAck) {
273                in = tryReceive(myContext.getLocallyDrivenTransportLayer());
274            } else {
275                in = tryReceive(myContext.getRemotelyDrivenTransportLayer());
276            }
277        } catch (TransportException e) {
278            try {
279                Thread.sleep(1000);
280            } catch (InterruptedException e1) {}
281            throw e;
282        }
283        
284        // log
285        if (in != null) {
286               log.debug("Received message: {}", in.getMessage());
287        } else {
288                log.debug("Received no message");
289        }
290        
291        // If we have a message, handle it
292        if (in != null) { 
293            String acceptAckNeeded = null;
294//            String appAckNeeded = null;
295            String ackCode = null;
296            String ackId = null;
297            
298            try {
299                    String[] fieldPaths = {"MSH-15", "MSH-16", "MSA-1", "MSA-2"};
300                    String[] fields = PreParser.getFields(in.getMessage(), fieldPaths);         
301                                acceptAckNeeded = fields[0];
302//                              appAckNeeded = fields[1];
303                                ackCode = fields[2];
304                                ackId = fields[3];
305            } catch (HL7Exception e) {
306                log.warn("Failed to parse accept ack fields in incoming message", e);
307            }
308            
309            if (ackId != null && ackCode != null && ackCode.startsWith("C")) {
310                long expiryTime = System.currentTimeMillis() + 1000 * 60;
311                myAcceptAcks.put(ackId, new ExpiringTransportable(in, expiryTime));
312            } else {
313                AcceptAcknowledger.AcceptACK ack = AcceptAcknowledger.validate(getContext(), in);
314            
315                if ((acceptAckNeeded != null && acceptAckNeeded.equals(AL)) 
316                    || (acceptAckNeeded != null && acceptAckNeeded.equals(ER) && !ack.isAcceptable()) 
317                    || (acceptAckNeeded != null && acceptAckNeeded.equals(SU) && ack.isAcceptable())) {
318                    trySend(myContext.getRemotelyDrivenTransportLayer(), ack.getMessage());    
319                }
320  
321                if (ack.isAcceptable()) {
322                    if (isReserved(ackId)) {
323                        
324                        log.debug("Received expected ACK message with ACK ID: {}", ackId);
325                        
326                        removeReservation(ackId);
327                        long expiryTime = System.currentTimeMillis() + 1000 * 60 * 5;                
328                        myAvailableMessages.put(ackId, new ExpiringTransportable(in, expiryTime));
329                        
330                    } else {
331
332                        log.debug("Sending message to router");
333                        Transportable out = myContext.getRouter().processMessage(in);
334                        sendAppResponse(out);
335                        
336                    }
337                } else {
338                        // TODO: should we do something more here? Might be nice to 
339                        // allow a configurable handler for this situation
340                        log.warn("Incoming message was not acceptable");
341                }
342                
343            }
344        } else {
345            String transport = expectingAck ? " Locally driven " : "Remotely driven";
346            log.debug("{} TransportLayer.receive() returned null.", transport);
347        }
348        
349        sleepIfNeeded();
350
351        log.debug("Exiting cycle()");
352    }
353    
354    /** Sends in a new thread if isThreaded, otherwise in current thread */
355    private void sendAppResponse(final Transportable theResponse) {
356        final ProcessorImpl processor = this;
357        Runnable sender = new Runnable() {
358            public void run() {
359                try {
360                        log.debug("Sending response: {}", theResponse);
361                        
362                    //TODO: make configurable 
363                        processor.send(theResponse, 2, 3000);
364                        
365                } catch (HL7Exception e) {
366                    log.error("Error trying to send response from Application", e);
367                }
368            }
369        };
370        
371        if (myThreaded) {
372            myResponseExecutorService.execute(sender);
373        } else {
374            sender.run();
375        }
376    }
377    
378    /**
379     * Removes expired message reservations from the reservation list.  
380     */
381    private synchronized void cleanReservations() {
382        Iterator<String> it = myReservations.keySet().iterator();
383        while (it.hasNext()) {
384            String ackId = it.next();
385            Long expiry = myReservations.get(ackId);
386            if (System.currentTimeMillis() > expiry.longValue()) {
387                it.remove();
388            }
389        }
390    }
391    
392    /**
393     * Discards expired accept acknowledgements (these are used in retry protocol; see send()).   
394     */
395    private synchronized void cleanAcceptAcks() {
396        Iterator<String> it = myAcceptAcks.keySet().iterator();
397        while (it.hasNext()) {
398            String ackId = it.next();
399            ExpiringTransportable et = myAcceptAcks.get(ackId);
400            if (System.currentTimeMillis() > et.expiryTime) {
401                it.remove();
402            }
403        }        
404    }
405    
406    private synchronized void cleanReservedMessages() throws HL7Exception {
407        Iterator<String> it = myAvailableMessages.keySet().iterator();
408        while (it.hasNext()) {
409            String ackId = it.next();            
410            ExpiringTransportable et = myAvailableMessages.get(ackId);
411            if (System.currentTimeMillis() > et.expiryTime) {
412                it.remove();
413                
414                //send to an Application 
415                Transportable out = myContext.getRouter().processMessage(et.transportable);
416                sendAppResponse(out);                
417            }
418        }  
419    }
420    
421    private synchronized boolean isReserved(String ackId) {
422        boolean reserved = false;
423        if (myReservations.containsKey(ackId)) {
424            reserved = true;
425        }
426        return reserved;
427    }
428    
429    private synchronized void removeReservation(String ackId) {
430        myReservations.remove(ackId);
431    }
432    
433
434    /**
435     * @see ca.uhn.hl7v2.protocol.Processor#isAvailable(java.lang.String)
436     */
437    public boolean isAvailable(String theAckId) {
438        boolean available = false;
439        if (myAvailableMessages.containsKey(theAckId)) {
440            available = true;
441        }
442        return available;
443    }
444
445    /** 
446     * @see ca.uhn.hl7v2.protocol.Processor#receive(java.lang.String, long)
447     */
448    public Transportable receive(String theAckId, long theTimeoutMillis) throws HL7Exception {
449        if (!isReserved(theAckId)) {
450            reserve(theAckId, theTimeoutMillis);
451        }
452        
453        Transportable in = null;
454        long until = System.currentTimeMillis() + theTimeoutMillis;
455        do {
456            synchronized (this) {
457                ExpiringTransportable et = myAvailableMessages.get(theAckId);                
458                if (et == null) {
459                    cycleIfNeeded(false);
460                } else {
461                    in = et.transportable;
462                }
463            }
464            sleepIfNeeded();
465        } while (in == null && System.currentTimeMillis() < until);
466        return in;
467    }
468
469    /** 
470     * @see ca.uhn.hl7v2.protocol.Processor#getContext()
471     */
472    public ProcessorContext getContext() {
473        return myContext;
474    }
475    
476    /**
477     * A struct for Transportable collection entries that time out.  
478     *  
479     * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
480     * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
481     */
482    class ExpiringTransportable {
483        public Transportable transportable;
484        public long expiryTime;
485        
486        public ExpiringTransportable(Transportable theTransportable, long theExpiryTime) {
487            transportable = theTransportable;
488            expiryTime = theExpiryTime;
489        }
490    }
491    
492    /**
493     * A Runnable that repeatedly calls the cycle() method of this class.  
494     * 
495     * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
496     * @version $Revision: 1.4 $ updated on $Date: 2009-12-16 19:36:34 $ by $Author: jamesagnew $
497     */
498    private static class Cycler implements Runnable {
499
500        private Processor myProcessor;
501        private boolean myExpectingAck;
502        private boolean isRunning;
503        
504        /**
505         * @param theProcessor the processor on which to call cycle()
506         * @param isExpectingAck passed to cycle()
507         */
508        public Cycler(Processor theProcessor, boolean isExpectingAck) {
509            myProcessor = theProcessor;
510            myExpectingAck = isExpectingAck;
511            isRunning = true;
512        }
513        
514        /**
515         * Execution will stop at the end of the next cycle.  
516         */
517        public void stop() {
518            isRunning = false;
519        }
520        
521        /** 
522         * Calls cycle() repeatedly on the Processor given in the 
523         * constructor, until stop() is called.  
524         * 
525         * @see java.lang.Runnable#run()
526         */
527        public void run() {
528            while (isRunning) {
529                try {
530                    myProcessor.cycle(myExpectingAck);
531                } catch (HL7Exception e) {
532                    log.error("Error processing message", e);
533                }
534            }
535        }        
536    }
537
538}