View Javadoc
1   /**
2    The contents of this file are subject to the Mozilla Public License Version 1.1
3    (the "License"); you may not use this file except in compliance with the License.
4    You may obtain a copy of the License at http://www.mozilla.org/MPL/
5    Software distributed under the License is distributed on an "AS IS" basis,
6    WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
7    specific language governing rights and limitations under the License.
8   
9    The Original Code is "HL7Service.java".  Description:
10   "Accepts incoming TCP/IP connections and creates Connection objects"
11  
12   The Initial Developer of the Original Code is University Health Network. Copyright (C)
13   2001.  All Rights Reserved.
14  
15   Contributor(s): Kyle Buza
16  
17   Alternatively, the contents of this file may be used under the terms of the
18   GNU General Public License (the  �GPL�), in which case the provisions of the GPL are
19   applicable instead of those above.  If you wish to allow use of your version of this
20   file only under the terms of the GPL and not to allow others to use your version
21   of this file under the MPL, indicate your decision by deleting  the provisions above
22   and replace  them with the notice and other provisions required by the GPL License.
23   If you do not delete the provisions above, a recipient may use your version of
24   this file under either the MPL or the GPL.
25  
26   */
27  
28  package ca.uhn.hl7v2.app;
29  
30  import ca.uhn.hl7v2.HL7Exception;
31  import ca.uhn.hl7v2.HapiContext;
32  import ca.uhn.hl7v2.app.Receiver.ReceiverParserExceptionHandler;
33  import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
34  import ca.uhn.hl7v2.concurrent.Service;
35  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
36  import ca.uhn.hl7v2.model.Message;
37  import ca.uhn.hl7v2.parser.Parser;
38  import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
39  import ca.uhn.hl7v2.protocol.ReceivingApplication;
40  import ca.uhn.hl7v2.protocol.ReceivingApplicationExceptionHandler;
41  import ca.uhn.hl7v2.protocol.impl.AppRoutingDataImpl;
42  import ca.uhn.hl7v2.protocol.impl.AppWrapper;
43  import ca.uhn.hl7v2.protocol.impl.ApplicationRouterImpl;
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  import java.io.BufferedReader;
48  import java.io.File;
49  import java.io.FileReader;
50  import java.io.IOException;
51  import java.util.*;
52  import java.util.concurrent.ExecutorService;
53  
54  /**
55   * <p>
56   * An HL7 service. Accepts incoming TCP/IP connections and creates Connection
57   * objects. Uses a single ApplicationRouter object (for all Connections) to
58   * define the Applications to which message are sent. To configure, use
59   * registerApplication() or loadApplicationsFromFile().
60   * </p>
61   * </p>A separate thread looks for Connections that have been closed (locally or
62   * remotely) and discards them. </p>
63   *
64   * @author Bryan Tripp
65   * @author Christian Ohr
66   */
67  public abstract class HL7Service extends Service {
68  
69      private static final Logger log = LoggerFactory.getLogger(HL7Service.class);
70  
71      private final List<Connection> connections;
72      private final Parser parser;
73      private final LowerLayerProtocol llp;
74      private final List<ConnectionListener> listeners;
75      private final ConnectionCleaner cleaner;
76      private final ApplicationRouterImpl applicationRouter;
77      private ReceiverParserExceptionHandler parserExeptionHandler;
78  
79      /**
80       * @param theHapiContext HapiContext
81       */
82      public HL7Service(HapiContext theHapiContext) {
83          this(theHapiContext.getGenericParser(), theHapiContext.getLowerLayerProtocol(), theHapiContext.getExecutorService());
84      }
85  
86      /**
87       * @param parser parser to be used
88       * @param llp    LowerLayerProtocol
89       */
90      public HL7Service(final Parser parser, final LowerLayerProtocol llp) {
91          this(parser, llp, DefaultExecutorService.getDefaultService());
92      }
93  
94      /**
95       * Creates a new instance of Server
96       *
97       * @param parser          parser to be used
98       * @param llp             LowerLayerProtocol
99       * @param executorService executor used for starting threads
100      */
101     public HL7Service(Parser parser, LowerLayerProtocol llp,
102                       ExecutorService executorService) {
103         super("HL7 Server", executorService);
104         this.connections = new ArrayList<>();
105         this.listeners = new ArrayList<>();
106         this.parser = parser;
107         this.llp = llp;
108         this.applicationRouter = new ApplicationRouterImpl(parser);
109         this.cleaner = new ConnectionCleaner(this);
110 
111         // 960101
112         assert !this.cleaner.isRunning();
113     }
114 
115     /**
116      * Called after startup before the thread enters its main loop. This
117      * implementation launches a cleaner thread that removes stale connections
118      * from the connection list. Override to initialize resources for the
119      * running thread, e.g. opening {@link java.net.ServerSocket}s etc.
120      */
121     @Override
122     protected void afterStartup() {
123         // Fix for bug 960101: Don't start the cleaner thread until the
124         // server is started.
125         cleaner.start();
126     }
127 
128     /**
129      * Called after the thread has left its main loop. This implementation stops
130      * the connection cleaner thread and closes any open connections. Override
131      * to clean up additional resources from the running thread, e.g. closing
132      * {@link java.net.ServerSocket}s.
133      */
134     @Override
135     protected void afterTermination() {
136         super.afterTermination();
137         cleaner.stopAndWait();
138         for (Connection c : connections) {
139             try {
140                 c.close();
141             } catch (IOException e) {
142                 throw new RuntimeException(e);
143             }
144         }
145     }
146 
147     /**
148      * Returns true if the thread should continue to run, false otherwise (ie if
149      * stop() has been called).
150      *
151      * @deprecated Use {@link #isRunning()}. Deprecated as of version 0.6.
152      */
153     protected boolean keepRunning() {
154         return isRunning();
155     }
156 
157     LowerLayerProtocol getLlp() {
158         return llp;
159     }
160 
161     Parser getParser() {
162         return parser;
163     }
164 
165     /**
166      * Called by subclasses when a new Connection is made. Registers the
167      * ApplicationRouter with the given Connection and stores it.
168      *
169      * @param c existing connection
170      */
171     public synchronized void newConnection(ActiveConnection c) {
172         c.getResponder().setApplicationRouter(applicationRouter);
173         c.setReceiverParserExeptionHandler(parserExeptionHandler);
174         c.activate();
175         connections.add(c); // keep track of connections
176         notifyListeners(c);
177     }
178 
179     /**
180      * Returns a connection to a remote host that was initiated by the given
181      * remote host. If the connection has not been made, this method blocks
182      * until the remote host connects.
183      *
184      * @param ipAddress IP Address
185      * @return connection that was initiated by the given address
186      */
187     public Connection getRemoteConnection(String ipAddress) {
188         Connection conn = null;
189         while (conn == null) {
190             // check all connections ...
191             int c = 0;
192             synchronized (this) {
193                 while (conn == null && c < connections.size()) {
194                     Connection nextConn = connections.get(c);
195                     if (nextConn.getRemoteAddress().getHostAddress().equals(ipAddress))
196                         conn = nextConn;
197                     c++;
198                 }
199             }
200 
201             if (conn == null) {
202                 try {
203                     Thread.sleep(100);
204                 } catch (InterruptedException e) {
205                     // don't care
206                 }
207             }
208         }
209         return conn;
210     }
211 
212     /**
213      * Returns all currently active connections.
214      *
215      * @return list of active remote connections
216      */
217     public synchronized List<Connection> getRemoteConnections() {
218         return connections;
219     }
220 
221     /**
222      * Registers the given ConnectionListener with the HL7Service - when a
223      * remote host makes a new Connection, all registered listeners will be
224      * notified.
225      *
226      * @param listener connection listener to be called
227      */
228     public synchronized void registerConnectionListener(
229             ConnectionListener listener) {
230         listeners.add(listener);
231     }
232 
233     /**
234      * Notifies all listeners that a Connection is new or discarded.
235      */
236     private void notifyListeners(Connection c) {
237         for (ConnectionListener cl : listeners) {
238             if (c.isOpen()) {
239                 cl.connectionReceived(c);
240             } else {
241                 cl.connectionDiscarded(c);
242             }
243         }
244     }
245 
246     /**
247      * Registers the given application to handle messages corresponding to the
248      * given type and trigger event. Only one application can be registered for
249      * a given message type and trigger event combination. A repeated
250      * registration for a particular combination of type and trigger event
251      * over-writes the previous one. Note that the wildcard "*" for messageType
252      * or triggerEvent means any type or event, respectively.
253      *
254      * @deprecated use {@link #registerApplication(String, String, ca.uhn.hl7v2.protocol.ReceivingApplication)} and
255      * {@link ca.uhn.hl7v2.protocol.impl.AppWrapper}
256      */
257     public synchronized void registerApplication(String messageType,
258                                                  String triggerEvent, Application handler) {
259         ReceivingApplication<Message> handlerWrapper = new AppWrapper(handler);
260         applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handlerWrapper);
261     }
262 
263     /**
264      * Registers the given application to handle messages corresponding to the
265      * given type and trigger event. Only one application can be registered for
266      * a given message type and trigger event combination. A repeated
267      * registration for a particular combination of type and trigger event
268      * over-writes the previous one. Note that the wildcard "*" for messageType
269      * or triggerEvent means any type or event, respectively.
270      */
271     public void registerApplication(String messageType, String triggerEvent, ReceivingApplication handler) {
272         applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handler);
273     }
274 
275     /**
276      * Registers the given application to handle messages corresponding to ALL
277      * message types and trigger events.
278      */
279     public synchronized void registerApplication(AppRoutingData appRouting, ReceivingApplication<? extends Message> application) {
280         if (appRouting == null) {
281             throw new NullPointerException("appRouting must not be null");
282         }
283         applicationRouter.bindApplication(appRouting, application);
284     }
285 
286     /**
287      * Registers the given application to handle messages corresponding to ALL
288      * message types and trigger events.
289      */
290     public synchronized void registerApplication(ReceivingApplication<? extends Message> application) {
291 
292         registerApplication(new AppRoutingDataImpl("*", "*", "*", "*"), application);
293     }
294 
295     /**
296      * Unregisteres the first application that matches the routing data
297      *
298      * @param appRouting
299      * @return true if an application was unregistered, false otherwise
300      */
301     public synchronized boolean unregisterApplication(AppRoutingData appRouting) {
302         if (appRouting == null) {
303             throw new NullPointerException("appRouting must not be null");
304         }
305         return applicationRouter.unbindApplication(appRouting);
306     }
307 
308     /**
309      * Unregisteres the passed application
310      *
311      * @param application receiving application
312      * @return true if an application was unregistered, false otherwise
313      */
314     public synchronized boolean unregisterApplication(ReceivingApplication<? extends Message> application) {
315         if (application == null) {
316             throw new NullPointerException("application must not be null");
317         }
318         return applicationRouter.unbindApplication(application);
319     }
320 
321     /**
322      * Sets an exception handler which will be invoked in the event of a
323      * failure during parsing, processing, or encoding of an
324      * incoming message or its response.
325      */
326     public synchronized void setExceptionHandler(ReceivingApplicationExceptionHandler exHandler) {
327         applicationRouter.setExceptionHandler(exHandler);
328     }
329 
330     /**
331      * Register a receiver level parser exception handler so that an application can be notified
332      * of protocol level parsing errors if any
333      */
334     public void setParserExeptionHandler(ReceiverParserExceptionHandler parserExeptionHandler) {
335     	this.parserExeptionHandler = parserExeptionHandler;
336     }
337 
338     /**
339      * <p>
340      * A convenience method for registering applications (using
341      * <code>registerApplication()
342      * </code>) with this service. Information about which Applications should
343      * handle which messages is read from the given text file. Each line in the
344      * file should have the following format (entries tab delimited):
345      * </p>
346      * <p>
347      * message_type &#009; trigger_event &#009; application_class
348      * </p>
349      * <p>
350      * message_type &#009; trigger_event &#009; application_class
351      * </p>
352      * <p>
353      * Note that message type and event can be the wildcard "*", which means
354      * any.
355      * </p>
356      * <p>
357      * For example, if you write an Application called
358      * org.yourorganiztion.ADTProcessor that processes several types of ADT
359      * messages, and another called org.yourorganization.ResultProcessor that
360      * processes result messages, you might have a file that looks like this:
361      * </p>
362      * <p>
363      * ADT &#009; * &#009; org.yourorganization.ADTProcessor<br>
364      * ORU &#009; R01 &#009; org.yourorganization.ResultProcessor
365      * </p>
366      * <p>
367      * Each class listed in this file must implement Application and must have a
368      * zero-argument constructor.
369      * </p>
370      */
371     public void loadApplicationsFromFile(File f) throws IOException,
372             HL7Exception, ClassNotFoundException, InstantiationException,
373             IllegalAccessException {
374         try (BufferedReader in = new BufferedReader(new FileReader(f))) {
375             String line;
376             while ((line = in.readLine()) != null) {
377                 // parse application registration information
378                 StringTokenizer tok = new StringTokenizer(line, "\t", false);
379                 String type, event, className;
380 
381                 if (tok.hasMoreTokens()) { // skip blank lines
382                     try {
383                         type = tok.nextToken();
384                         event = tok.nextToken();
385                         className = tok.nextToken();
386                     } catch (NoSuchElementException ne) {
387                         throw new HL7Exception(
388                                 "Can't register applications from file "
389                                         + f.getName()
390                                         + ". The line '"
391                                         + line
392                                         + "' is not of the form: message_type [tab] trigger_event [tab] application_class.");
393                     }
394 
395                     try {
396                         @SuppressWarnings("unchecked")
397                         Class<? extends Application> appClass = (Class<? extends Application>) Class
398                                 .forName(className); // may throw
399                         // ClassNotFoundException
400                         Application app = appClass.newInstance();
401                         registerApplication(type, event, new AppWrapper(app));
402                     } catch (ClassCastException cce) {
403                         throw new HL7Exception("The specified class, " + className
404                                 + ", doesn't implement Application.");
405                     }
406 
407                 }
408             }
409         }
410         // don't care
411     }
412 
413     /**
414      * Runnable that looks for closed Connections and discards them. It would be
415      * nice to find a way to externalize this safely so that it could be re-used
416      * by (for example) TestPanel. It could take a Vector of Connections as an
417      * argument, instead of an HL7Service, but some problems might arise if
418      * other threads were iterating through the Vector while this one was
419      * removing elements from it.
420      * <p/>
421      * Note: this could be started as daemon, so we don't need to care about
422      * termination.
423      */
424     private static class ConnectionCleaner extends Service {
425 
426         private final HL7Service service;
427 
428         public ConnectionCleaner(HL7Service service) {
429             super("ConnectionCleaner", service.getExecutorService());
430             this.service = service;
431         }
432 
433         @Override
434         public void start() {
435             log.info("Starting ConnectionCleaner service");
436             super.start();
437         }
438 
439         public void handle() {
440             try {
441                 Thread.sleep(500);
442                 synchronized (service) {
443                     Iterator<Connection> it = service.getRemoteConnections()
444                             .iterator();
445                     while (it.hasNext()) {
446                         Connection conn = it.next();
447                         if (!conn.isOpen()) {
448                             log.debug(
449                                     "Removing connection from {} from connection list",
450                                     conn.getRemoteAddress().getHostAddress());
451                             it.remove();
452                             service.notifyListeners(conn);
453                         }
454                     }
455                 }
456             } catch (InterruptedException e) {
457                 // don't care
458             }
459         }
460 
461     }
462 
463 }