Coverage Report - ca.uhn.hl7v2.app.HL7Service
 
Classes in this File Line Coverage Branch Coverage Complexity
HL7Service
43%
44/101
28%
9/32
2.083
HL7Service$ConnectionCleaner
100%
22/22
100%
4/4
2.083
 
 1  
 /**
 2  
  The contents of this file are subject to the Mozilla Public License Version 1.1
 3  
  (the "License"); you may not use this file except in compliance with the License.
 4  
  You may obtain a copy of the License at http://www.mozilla.org/MPL/
 5  
  Software distributed under the License is distributed on an "AS IS" basis,
 6  
  WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
 7  
  specific language governing rights and limitations under the License.
 8  
 
 9  
  The Original Code is "HL7Service.java".  Description:
 10  
  "Accepts incoming TCP/IP connections and creates Connection objects"
 11  
 
 12  
  The Initial Developer of the Original Code is University Health Network. Copyright (C)
 13  
  2001.  All Rights Reserved.
 14  
 
 15  
  Contributor(s): Kyle Buza
 16  
 
 17  
  Alternatively, the contents of this file may be used under the terms of the
 18  
  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are
 19  
  applicable instead of those above.  If you wish to allow use of your version of this
 20  
  file only under the terms of the GPL and not to allow others to use your version
 21  
  of this file under the MPL, indicate your decision by deleting  the provisions above
 22  
  and replace  them with the notice and other provisions required by the GPL License.
 23  
  If you do not delete the provisions above, a recipient may use your version of
 24  
  this file under either the MPL or the GPL.
 25  
 
 26  
  */
 27  
 
 28  
 package ca.uhn.hl7v2.app;
 29  
 
 30  
 import ca.uhn.hl7v2.HL7Exception;
 31  
 import ca.uhn.hl7v2.HapiContext;
 32  
 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
 33  
 import ca.uhn.hl7v2.concurrent.Service;
 34  
 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
 35  
 import ca.uhn.hl7v2.model.Message;
 36  
 import ca.uhn.hl7v2.parser.Parser;
 37  
 import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
 38  
 import ca.uhn.hl7v2.protocol.ReceivingApplication;
 39  
 import ca.uhn.hl7v2.protocol.ReceivingApplicationExceptionHandler;
 40  
 import ca.uhn.hl7v2.protocol.impl.AppRoutingDataImpl;
 41  
 import ca.uhn.hl7v2.protocol.impl.AppWrapper;
 42  
 import ca.uhn.hl7v2.protocol.impl.ApplicationRouterImpl;
 43  
 import org.slf4j.Logger;
 44  
 import org.slf4j.LoggerFactory;
 45  
 
 46  
 import java.io.BufferedReader;
 47  
 import java.io.File;
 48  
 import java.io.FileReader;
 49  
 import java.io.IOException;
 50  
 import java.util.*;
 51  
 import java.util.concurrent.ExecutorService;
 52  
 
 53  
 /**
 54  
  * <p>
 55  
  * An HL7 service. Accepts incoming TCP/IP connections and creates Connection
 56  
  * objects. Uses a single ApplicationRouter object (for all Connections) to
 57  
  * define the Applications to which message are sent. To configure, use
 58  
  * registerApplication() or loadApplicationsFromFile().
 59  
  * </p>
 60  
  * </p>A separate thread looks for Connections that have been closed (locally or
 61  
  * remotely) and discards them. </p>
 62  
  *
 63  
  * @author Bryan Tripp
 64  
  * @author Christian Ohr
 65  
  */
 66  464
 public abstract class HL7Service extends Service {
 67  
 
 68  5
     private static final Logger log = LoggerFactory.getLogger(HL7Service.class);
 69  
 
 70  
     private final List<Connection> connections;
 71  
     private final Parser parser;
 72  
     private final LowerLayerProtocol llp;
 73  
     private final List<ConnectionListener> listeners;
 74  
     private final ConnectionCleaner cleaner;
 75  
     private final ApplicationRouterImpl applicationRouter;
 76  
 
 77  
     /**
 78  
      * @param theHapiContext HapiContext
 79  
      */
 80  
     public HL7Service(HapiContext theHapiContext) {
 81  115
         this(theHapiContext.getGenericParser(), theHapiContext.getLowerLayerProtocol(), theHapiContext.getExecutorService());
 82  115
     }
 83  
 
 84  
     /**
 85  
      * @param parser parser to be used
 86  
      * @param llp    LowerLayerProtocol
 87  
      */
 88  
     public HL7Service(final Parser parser, final LowerLayerProtocol llp) {
 89  0
         this(parser, llp, DefaultExecutorService.getDefaultService());
 90  0
     }
 91  
 
 92  
     /**
 93  
      * Creates a new instance of Server
 94  
      *
 95  
      * @param parser          parser to be used
 96  
      * @param llp             LowerLayerProtocol
 97  
      * @param executorService executor used for starting threads
 98  
      */
 99  
     public HL7Service(Parser parser, LowerLayerProtocol llp,
 100  
                       ExecutorService executorService) {
 101  145
         super("HL7 Server", executorService);
 102  145
         this.connections = new ArrayList<Connection>();
 103  145
         this.listeners = new ArrayList<ConnectionListener>();
 104  145
         this.parser = parser;
 105  145
         this.llp = llp;
 106  145
         this.applicationRouter = new ApplicationRouterImpl(parser);
 107  145
         this.cleaner = new ConnectionCleaner(this);
 108  
 
 109  
         // 960101
 110  145
         assert !this.cleaner.isRunning();
 111  145
     }
 112  
 
 113  
     /**
 114  
      * Called after startup before the thread enters its main loop. This
 115  
      * implementation launches a cleaner thread that removes stale connections
 116  
      * from the connection list. Override to initialize resources for the
 117  
      * running thread, e.g. opening {@link java.net.ServerSocket}s etc.
 118  
      */
 119  
     @Override
 120  
     protected void afterStartup() {
 121  
         // Fix for bug 960101: Don't start the cleaner thread until the
 122  
         // server is started.
 123  135
         cleaner.start();
 124  135
     }
 125  
 
 126  
     /**
 127  
      * Called after the thread has left its main loop. This implementation stops
 128  
      * the connection cleaner thread and closes any open connections. Override
 129  
      * to clean up additional resources from the running thread, e.g. closing
 130  
      * {@link java.net.ServerSocket}s.
 131  
      */
 132  
     @Override
 133  
     protected void afterTermination() {
 134  105
         super.afterTermination();
 135  105
         cleaner.stopAndWait();
 136  105
         for (Connection c : connections) {
 137  62
             c.close();
 138  62
         }
 139  105
     }
 140  
 
 141  
     /**
 142  
      * Returns true if the thread should continue to run, false otherwise (ie if
 143  
      * stop() has been called).
 144  
      *
 145  
      * @deprecated Use {@link #isRunning()}. Deprecated as of version 0.6.
 146  
      */
 147  
     protected boolean keepRunning() {
 148  0
         return isRunning();
 149  
     }
 150  
 
 151  
     LowerLayerProtocol getLlp() {
 152  234
         return llp;
 153  
     }
 154  
 
 155  
     Parser getParser() {
 156  234
         return parser;
 157  
     }
 158  
 
 159  
     /**
 160  
      * Called by subclasses when a new Connection is made. Registers the
 161  
      * ApplicationRouter with the given Connection and stores it.
 162  
      *
 163  
      * @param c existing connection
 164  
      */
 165  
     public synchronized void newConnection(ActiveConnection c) {
 166  234
         c.getResponder().setApplicationRouter(applicationRouter);
 167  234
         c.activate();
 168  234
         connections.add(c); // keep track of connections
 169  234
         notifyListeners(c);
 170  234
     }
 171  
 
 172  
     /**
 173  
      * Returns a connection to a remote host that was initiated by the given
 174  
      * remote host. If the connection has not been made, this method blocks
 175  
      * until the remote host connects.
 176  
      *
 177  
      * @param ipAddress IP Address
 178  
      * @return connection that was initiated by the given address
 179  
      */
 180  
     public Connection getRemoteConnection(String ipAddress) {
 181  0
         Connection conn = null;
 182  0
         while (conn == null) {
 183  
             // check all connections ...
 184  0
             int c = 0;
 185  0
             synchronized (this) {
 186  0
                 while (conn == null && c < connections.size()) {
 187  0
                     Connection nextConn = connections.get(c);
 188  0
                     if (nextConn.getRemoteAddress().getHostAddress().equals(ipAddress))
 189  0
                         conn = nextConn;
 190  0
                     c++;
 191  0
                 }
 192  0
             }
 193  
 
 194  0
             if (conn == null) {
 195  
                 try {
 196  0
                     Thread.sleep(100);
 197  0
                 } catch (InterruptedException e) {
 198  
                     // don't care
 199  0
                 }
 200  
             }
 201  0
         }
 202  0
         return conn;
 203  
     }
 204  
 
 205  
     /**
 206  
      * Returns all currently active connections.
 207  
      *
 208  
      * @return list of active remote connections
 209  
      */
 210  
     public synchronized List<Connection> getRemoteConnections() {
 211  3325
         return connections;
 212  
     }
 213  
 
 214  
     /**
 215  
      * Registers the given ConnectionListener with the HL7Service - when a
 216  
      * remote host makes a new Connection, all registered listeners will be
 217  
      * notified.
 218  
      *
 219  
      * @param listener connection listener to be called
 220  
      */
 221  
     public synchronized void registerConnectionListener(
 222  
             ConnectionListener listener) {
 223  5
         listeners.add(listener);
 224  5
     }
 225  
 
 226  
     /**
 227  
      * Notifies all listeners that a Connection is new or discarded.
 228  
      */
 229  
     private void notifyListeners(Connection c) {
 230  396
         for (ConnectionListener cl : listeners) {
 231  10
             if (c.isOpen()) {
 232  5
                 cl.connectionReceived(c);
 233  
             } else {
 234  5
                 cl.connectionDiscarded(c);
 235  
             }
 236  10
         }
 237  396
     }
 238  
 
 239  
     /**
 240  
      * Registers the given application to handle messages corresponding to the
 241  
      * given type and trigger event. Only one application can be registered for
 242  
      * a given message type and trigger event combination. A repeated
 243  
      * registration for a particular combination of type and trigger event
 244  
      * over-writes the previous one. Note that the wildcard "*" for messageType
 245  
      * or triggerEvent means any type or event, respectively.
 246  
      *
 247  
      * @deprecated use {@link #registerApplication(String, String, ca.uhn.hl7v2.protocol.ReceivingApplication)} and
 248  
      * {@link ca.uhn.hl7v2.protocol.impl.AppWrapper}
 249  
      */
 250  
     public synchronized void registerApplication(String messageType,
 251  
                                                  String triggerEvent, Application handler) {
 252  0
         ReceivingApplication<Message> handlerWrapper = new AppWrapper(handler);
 253  0
         applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handlerWrapper);
 254  0
     }
 255  
 
 256  
     /**
 257  
      * Registers the given application to handle messages corresponding to the
 258  
      * given type and trigger event. Only one application can be registered for
 259  
      * a given message type and trigger event combination. A repeated
 260  
      * registration for a particular combination of type and trigger event
 261  
      * over-writes the previous one. Note that the wildcard "*" for messageType
 262  
      * or triggerEvent means any type or event, respectively.
 263  
      */
 264  
     public void registerApplication(String messageType, String triggerEvent, ReceivingApplication handler) {
 265  85
         applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handler);
 266  85
     }
 267  
 
 268  
     /**
 269  
      * Registers the given application to handle messages corresponding to ALL
 270  
      * message types and trigger events.
 271  
      */
 272  
     public synchronized void registerApplication(AppRoutingData appRouting, ReceivingApplication<? extends Message> application) {
 273  30
         if (appRouting == null) {
 274  0
             throw new NullPointerException("appRouting must not be null");
 275  
         }
 276  30
         applicationRouter.bindApplication(appRouting, application);
 277  30
     }
 278  
 
 279  
     /**
 280  
      * Registers the given application to handle messages corresponding to ALL
 281  
      * message types and trigger events.
 282  
      */
 283  
     public synchronized void registerApplication(ReceivingApplication<? extends Message> application) {
 284  
 
 285  30
         registerApplication(new AppRoutingDataImpl("*", "*", "*", "*"), application);
 286  30
     }
 287  
 
 288  
     /**
 289  
      * Unregisteres the first application that matches the routing data
 290  
      *
 291  
      * @param appRouting
 292  
      * @return true if an application was unregistered, false otherwise
 293  
      */
 294  
     public synchronized boolean unregisterApplication(AppRoutingData appRouting) {
 295  0
         if (appRouting == null) {
 296  0
             throw new NullPointerException("appRouting must not be null");
 297  
         }
 298  0
         return applicationRouter.unbindApplication(appRouting);
 299  
     }
 300  
 
 301  
     /**
 302  
      * Unregisteres the passed application
 303  
      *
 304  
      * @param application receiving application
 305  
      * @return true if an application was unregistered, false otherwise
 306  
      */
 307  
     public synchronized boolean unregisterApplication(ReceivingApplication<? extends Message> application) {
 308  0
         if (application == null) {
 309  0
             throw new NullPointerException("application must not be null");
 310  
         }
 311  0
         return applicationRouter.unbindApplication(application);
 312  
     }
 313  
 
 314  
     /**
 315  
      * Sets an exception handler which will be invoked in the event of a
 316  
      * failure during parsing, processing, or encoding of an
 317  
      * incoming message or its response.
 318  
      */
 319  
     public synchronized void setExceptionHandler(ReceivingApplicationExceptionHandler exHandler) {
 320  0
         applicationRouter.setExceptionHandler(exHandler);
 321  0
     }
 322  
 
 323  
 
 324  
     /**
 325  
      * <p>
 326  
      * A convenience method for registering applications (using
 327  
      * <code>registerApplication()
 328  
      * </code>) with this service. Information about which Applications should
 329  
      * handle which messages is read from the given text file. Each line in the
 330  
      * file should have the following format (entries tab delimited):
 331  
      * </p>
 332  
      * <p>
 333  
      * message_type &#009; trigger_event &#009; application_class
 334  
      * </p>
 335  
      * <p>
 336  
      * message_type &#009; trigger_event &#009; application_class
 337  
      * </p>
 338  
      * <p>
 339  
      * Note that message type and event can be the wildcard "*", which means
 340  
      * any.
 341  
      * </p>
 342  
      * <p>
 343  
      * For example, if you write an Application called
 344  
      * org.yourorganiztion.ADTProcessor that processes several types of ADT
 345  
      * messages, and another called org.yourorganization.ResultProcessor that
 346  
      * processes result messages, you might have a file that looks like this:
 347  
      * </p>
 348  
      * <p>
 349  
      * ADT &#009; * &#009; org.yourorganization.ADTProcessor<br>
 350  
      * ORU &#009; R01 &#009; org.yourorganization.ResultProcessor
 351  
      * </p>
 352  
      * <p>
 353  
      * Each class listed in this file must implement Application and must have a
 354  
      * zero-argument constructor.
 355  
      * </p>
 356  
      */
 357  
     public void loadApplicationsFromFile(File f) throws IOException,
 358  
             HL7Exception, ClassNotFoundException, InstantiationException,
 359  
             IllegalAccessException {
 360  0
         BufferedReader in = null;
 361  
         try {
 362  0
             in = new BufferedReader(new FileReader(f));
 363  
             String line;
 364  0
             while ((line = in.readLine()) != null) {
 365  
                 // parse application registration information
 366  0
                 StringTokenizer tok = new StringTokenizer(line, "\t", false);
 367  
                 String type, event, className;
 368  
 
 369  0
                 if (tok.hasMoreTokens()) { // skip blank lines
 370  
                     try {
 371  0
                         type = tok.nextToken();
 372  0
                         event = tok.nextToken();
 373  0
                         className = tok.nextToken();
 374  0
                     } catch (NoSuchElementException ne) {
 375  0
                         throw new HL7Exception(
 376  
                                 "Can't register applications from file "
 377  0
                                         + f.getName()
 378  
                                         + ". The line '"
 379  
                                         + line
 380  
                                         + "' is not of the form: message_type [tab] trigger_event [tab] application_class.");
 381  0
                     }
 382  
 
 383  
                     try {
 384  
                         @SuppressWarnings("unchecked")
 385  0
                         Class<? extends Application> appClass = (Class<? extends Application>) Class
 386  0
                                 .forName(className); // may throw
 387  
                         // ClassNotFoundException
 388  0
                         Application app = appClass.newInstance();
 389  0
                         registerApplication(type, event, new AppWrapper(app));
 390  0
                     } catch (ClassCastException cce) {
 391  0
                         throw new HL7Exception("The specified class, " + className
 392  
                                 + ", doesn't implement Application.");
 393  0
                     }
 394  
 
 395  
                 }
 396  0
             }
 397  
         } finally {
 398  0
             if (in != null) {
 399  
                 try {
 400  0
                     in.close();
 401  0
                 } catch (IOException e) {
 402  
                     // don't care
 403  0
                 }
 404  
             }
 405  
         }
 406  0
     }
 407  
 
 408  
     /**
 409  
      * Runnable that looks for closed Connections and discards them. It would be
 410  
      * nice to find a way to externalize this safely so that it could be re-used
 411  
      * by (for example) TestPanel. It could take a Vector of Connections as an
 412  
      * argument, instead of an HL7Service, but some problems might arise if
 413  
      * other threads were iterating through the Vector while this one was
 414  
      * removing elements from it.
 415  
      * <p/>
 416  
      * Note: this could be started as daemon, so we don't need to care about
 417  
      * termination.
 418  
      */
 419  
     private static class ConnectionCleaner extends Service {
 420  
 
 421  
         private final HL7Service service;
 422  
 
 423  
         public ConnectionCleaner(HL7Service service) {
 424  145
             super("ConnectionCleaner", service.getExecutorService());
 425  145
             this.service = service;
 426  145
         }
 427  
 
 428  
         @Override
 429  
         public void start() {
 430  135
             log.info("Starting ConnectionCleaner service");
 431  135
             super.start();
 432  135
         }
 433  
 
 434  
         public void handle() {
 435  
             try {
 436  3378
                 Thread.sleep(500);
 437  3305
                 synchronized (service) {
 438  3305
                     Iterator<Connection> it = service.getRemoteConnections()
 439  3305
                             .iterator();
 440  3657
                     while (it.hasNext()) {
 441  352
                         Connection conn = it.next();
 442  352
                         if (!conn.isOpen()) {
 443  324
                             log.debug(
 444  
                                     "Removing connection from {} from connection list",
 445  162
                                     conn.getRemoteAddress().getHostAddress());
 446  162
                             it.remove();
 447  162
                             service.notifyListeners(conn);
 448  
                         }
 449  352
                     }
 450  3305
                 }
 451  43
             } catch (InterruptedException e) {
 452  
                 // don't care
 453  3305
             }
 454  3348
         }
 455  
 
 456  
     }
 457  
 
 458  
 }