001/**
002 The contents of this file are subject to the Mozilla Public License Version 1.1
003 (the "License"); you may not use this file except in compliance with the License.
004 You may obtain a copy of the License at http://www.mozilla.org/MPL/
005 Software distributed under the License is distributed on an "AS IS" basis,
006 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
007 specific language governing rights and limitations under the License.
008
009 The Original Code is "HL7Service.java".  Description:
010 "Accepts incoming TCP/IP connections and creates Connection objects"
011
012 The Initial Developer of the Original Code is University Health Network. Copyright (C)
013 2001.  All Rights Reserved.
014
015 Contributor(s): Kyle Buza
016
017 Alternatively, the contents of this file may be used under the terms of the
018 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are
019 applicable instead of those above.  If you wish to allow use of your version of this
020 file only under the terms of the GPL and not to allow others to use your version
021 of this file under the MPL, indicate your decision by deleting  the provisions above
022 and replace  them with the notice and other provisions required by the GPL License.
023 If you do not delete the provisions above, a recipient may use your version of
024 this file under either the MPL or the GPL.
025
026 */
027
028package ca.uhn.hl7v2.app;
029
030import ca.uhn.hl7v2.HL7Exception;
031import ca.uhn.hl7v2.HapiContext;
032import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
033import ca.uhn.hl7v2.concurrent.Service;
034import ca.uhn.hl7v2.llp.LowerLayerProtocol;
035import ca.uhn.hl7v2.model.Message;
036import ca.uhn.hl7v2.parser.Parser;
037import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
038import ca.uhn.hl7v2.protocol.ReceivingApplication;
039import ca.uhn.hl7v2.protocol.ReceivingApplicationExceptionHandler;
040import ca.uhn.hl7v2.protocol.impl.AppRoutingDataImpl;
041import ca.uhn.hl7v2.protocol.impl.AppWrapper;
042import ca.uhn.hl7v2.protocol.impl.ApplicationRouterImpl;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import java.io.BufferedReader;
047import java.io.File;
048import java.io.FileReader;
049import java.io.IOException;
050import java.util.*;
051import java.util.concurrent.ExecutorService;
052
053/**
054 * <p>
055 * An HL7 service. Accepts incoming TCP/IP connections and creates Connection
056 * objects. Uses a single ApplicationRouter object (for all Connections) to
057 * define the Applications to which message are sent. To configure, use
058 * registerApplication() or loadApplicationsFromFile().
059 * </p>
060 * </p>A separate thread looks for Connections that have been closed (locally or
061 * remotely) and discards them. </p>
062 *
063 * @author Bryan Tripp
064 * @author Christian Ohr
065 */
066public abstract class HL7Service extends Service {
067
068    private static final Logger log = LoggerFactory.getLogger(HL7Service.class);
069
070    private final List<Connection> connections;
071    private final Parser parser;
072    private final LowerLayerProtocol llp;
073    private final List<ConnectionListener> listeners;
074    private final ConnectionCleaner cleaner;
075    private final ApplicationRouterImpl applicationRouter;
076
077    /**
078     * @param theHapiContext HapiContext
079     */
080    public HL7Service(HapiContext theHapiContext) {
081        this(theHapiContext.getGenericParser(), theHapiContext.getLowerLayerProtocol(), theHapiContext.getExecutorService());
082    }
083
084    /**
085     * @param parser parser to be used
086     * @param llp    LowerLayerProtocol
087     */
088    public HL7Service(final Parser parser, final LowerLayerProtocol llp) {
089        this(parser, llp, DefaultExecutorService.getDefaultService());
090    }
091
092    /**
093     * Creates a new instance of Server
094     *
095     * @param parser          parser to be used
096     * @param llp             LowerLayerProtocol
097     * @param executorService executor used for starting threads
098     */
099    public HL7Service(Parser parser, LowerLayerProtocol llp,
100                      ExecutorService executorService) {
101        super("HL7 Server", executorService);
102        this.connections = new ArrayList<Connection>();
103        this.listeners = new ArrayList<ConnectionListener>();
104        this.parser = parser;
105        this.llp = llp;
106        this.applicationRouter = new ApplicationRouterImpl(parser);
107        this.cleaner = new ConnectionCleaner(this);
108
109        // 960101
110        assert !this.cleaner.isRunning();
111    }
112
113    /**
114     * Called after startup before the thread enters its main loop. This
115     * implementation launches a cleaner thread that removes stale connections
116     * from the connection list. Override to initialize resources for the
117     * running thread, e.g. opening {@link java.net.ServerSocket}s etc.
118     */
119    @Override
120    protected void afterStartup() {
121        // Fix for bug 960101: Don't start the cleaner thread until the
122        // server is started.
123        cleaner.start();
124    }
125
126    /**
127     * Called after the thread has left its main loop. This implementation stops
128     * the connection cleaner thread and closes any open connections. Override
129     * to clean up additional resources from the running thread, e.g. closing
130     * {@link java.net.ServerSocket}s.
131     */
132    @Override
133    protected void afterTermination() {
134        super.afterTermination();
135        cleaner.stopAndWait();
136        for (Connection c : connections) {
137            c.close();
138        }
139    }
140
141    /**
142     * Returns true if the thread should continue to run, false otherwise (ie if
143     * stop() has been called).
144     *
145     * @deprecated Use {@link #isRunning()}. Deprecated as of version 0.6.
146     */
147    protected boolean keepRunning() {
148        return isRunning();
149    }
150
151    LowerLayerProtocol getLlp() {
152        return llp;
153    }
154
155    Parser getParser() {
156        return parser;
157    }
158
159    /**
160     * Called by subclasses when a new Connection is made. Registers the
161     * ApplicationRouter with the given Connection and stores it.
162     *
163     * @param c existing connection
164     */
165    public synchronized void newConnection(ActiveConnection c) {
166        c.getResponder().setApplicationRouter(applicationRouter);
167        c.activate();
168        connections.add(c); // keep track of connections
169        notifyListeners(c);
170    }
171
172    /**
173     * Returns a connection to a remote host that was initiated by the given
174     * remote host. If the connection has not been made, this method blocks
175     * until the remote host connects.
176     *
177     * @param ipAddress IP Address
178     * @return connection that was initiated by the given address
179     */
180    public Connection getRemoteConnection(String ipAddress) {
181        Connection conn = null;
182        while (conn == null) {
183            // check all connections ...
184            int c = 0;
185            synchronized (this) {
186                while (conn == null && c < connections.size()) {
187                    Connection nextConn = connections.get(c);
188                    if (nextConn.getRemoteAddress().getHostAddress().equals(ipAddress))
189                        conn = nextConn;
190                    c++;
191                }
192            }
193
194            if (conn == null) {
195                try {
196                    Thread.sleep(100);
197                } catch (InterruptedException e) {
198                    // don't care
199                }
200            }
201        }
202        return conn;
203    }
204
205    /**
206     * Returns all currently active connections.
207     *
208     * @return list of active remote connections
209     */
210    public synchronized List<Connection> getRemoteConnections() {
211        return connections;
212    }
213
214    /**
215     * Registers the given ConnectionListener with the HL7Service - when a
216     * remote host makes a new Connection, all registered listeners will be
217     * notified.
218     *
219     * @param listener connection listener to be called
220     */
221    public synchronized void registerConnectionListener(
222            ConnectionListener listener) {
223        listeners.add(listener);
224    }
225
226    /**
227     * Notifies all listeners that a Connection is new or discarded.
228     */
229    private void notifyListeners(Connection c) {
230        for (ConnectionListener cl : listeners) {
231            if (c.isOpen()) {
232                cl.connectionReceived(c);
233            } else {
234                cl.connectionDiscarded(c);
235            }
236        }
237    }
238
239    /**
240     * Registers the given application to handle messages corresponding to the
241     * given type and trigger event. Only one application can be registered for
242     * a given message type and trigger event combination. A repeated
243     * registration for a particular combination of type and trigger event
244     * over-writes the previous one. Note that the wildcard "*" for messageType
245     * or triggerEvent means any type or event, respectively.
246     *
247     * @deprecated use {@link #registerApplication(String, String, ca.uhn.hl7v2.protocol.ReceivingApplication)} and
248     * {@link ca.uhn.hl7v2.protocol.impl.AppWrapper}
249     */
250    public synchronized void registerApplication(String messageType,
251                                                 String triggerEvent, Application handler) {
252        ReceivingApplication<Message> handlerWrapper = new AppWrapper(handler);
253        applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handlerWrapper);
254    }
255
256    /**
257     * Registers the given application to handle messages corresponding to the
258     * given type and trigger event. Only one application can be registered for
259     * a given message type and trigger event combination. A repeated
260     * registration for a particular combination of type and trigger event
261     * over-writes the previous one. Note that the wildcard "*" for messageType
262     * or triggerEvent means any type or event, respectively.
263     */
264    public void registerApplication(String messageType, String triggerEvent, ReceivingApplication handler) {
265        applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handler);
266    }
267
268    /**
269     * Registers the given application to handle messages corresponding to ALL
270     * message types and trigger events.
271     */
272    public synchronized void registerApplication(AppRoutingData appRouting, ReceivingApplication<? extends Message> application) {
273        if (appRouting == null) {
274            throw new NullPointerException("appRouting must not be null");
275        }
276        applicationRouter.bindApplication(appRouting, application);
277    }
278
279    /**
280     * Registers the given application to handle messages corresponding to ALL
281     * message types and trigger events.
282     */
283    public synchronized void registerApplication(ReceivingApplication<? extends Message> application) {
284
285        registerApplication(new AppRoutingDataImpl("*", "*", "*", "*"), application);
286    }
287
288    /**
289     * Unregisteres the first application that matches the routing data
290     *
291     * @param appRouting
292     * @return true if an application was unregistered, false otherwise
293     */
294    public synchronized boolean unregisterApplication(AppRoutingData appRouting) {
295        if (appRouting == null) {
296            throw new NullPointerException("appRouting must not be null");
297        }
298        return applicationRouter.unbindApplication(appRouting);
299    }
300
301    /**
302     * Unregisteres the passed application
303     *
304     * @param application receiving application
305     * @return true if an application was unregistered, false otherwise
306     */
307    public synchronized boolean unregisterApplication(ReceivingApplication<? extends Message> application) {
308        if (application == null) {
309            throw new NullPointerException("application must not be null");
310        }
311        return applicationRouter.unbindApplication(application);
312    }
313
314    /**
315     * Sets an exception handler which will be invoked in the event of a
316     * failure during parsing, processing, or encoding of an
317     * incoming message or its response.
318     */
319    public synchronized void setExceptionHandler(ReceivingApplicationExceptionHandler exHandler) {
320        applicationRouter.setExceptionHandler(exHandler);
321    }
322
323
324    /**
325     * <p>
326     * A convenience method for registering applications (using
327     * <code>registerApplication()
328     * </code>) with this service. Information about which Applications should
329     * handle which messages is read from the given text file. Each line in the
330     * file should have the following format (entries tab delimited):
331     * </p>
332     * <p>
333     * message_type &#009; trigger_event &#009; application_class
334     * </p>
335     * <p>
336     * message_type &#009; trigger_event &#009; application_class
337     * </p>
338     * <p>
339     * Note that message type and event can be the wildcard "*", which means
340     * any.
341     * </p>
342     * <p>
343     * For example, if you write an Application called
344     * org.yourorganiztion.ADTProcessor that processes several types of ADT
345     * messages, and another called org.yourorganization.ResultProcessor that
346     * processes result messages, you might have a file that looks like this:
347     * </p>
348     * <p>
349     * ADT &#009; * &#009; org.yourorganization.ADTProcessor<br>
350     * ORU &#009; R01 &#009; org.yourorganization.ResultProcessor
351     * </p>
352     * <p>
353     * Each class listed in this file must implement Application and must have a
354     * zero-argument constructor.
355     * </p>
356     */
357    public void loadApplicationsFromFile(File f) throws IOException,
358            HL7Exception, ClassNotFoundException, InstantiationException,
359            IllegalAccessException {
360        BufferedReader in = null;
361        try {
362            in = new BufferedReader(new FileReader(f));
363            String line;
364            while ((line = in.readLine()) != null) {
365                // parse application registration information
366                StringTokenizer tok = new StringTokenizer(line, "\t", false);
367                String type, event, className;
368
369                if (tok.hasMoreTokens()) { // skip blank lines
370                    try {
371                        type = tok.nextToken();
372                        event = tok.nextToken();
373                        className = tok.nextToken();
374                    } catch (NoSuchElementException ne) {
375                        throw new HL7Exception(
376                                "Can't register applications from file "
377                                        + f.getName()
378                                        + ". The line '"
379                                        + line
380                                        + "' is not of the form: message_type [tab] trigger_event [tab] application_class.");
381                    }
382
383                    try {
384                        @SuppressWarnings("unchecked")
385                        Class<? extends Application> appClass = (Class<? extends Application>) Class
386                                .forName(className); // may throw
387                        // ClassNotFoundException
388                        Application app = appClass.newInstance();
389                        registerApplication(type, event, new AppWrapper(app));
390                    } catch (ClassCastException cce) {
391                        throw new HL7Exception("The specified class, " + className
392                                + ", doesn't implement Application.");
393                    }
394
395                }
396            }
397        } finally {
398            if (in != null) {
399                try {
400                    in.close();
401                } catch (IOException e) {
402                    // don't care
403                }
404            }
405        }
406    }
407
408    /**
409     * Runnable that looks for closed Connections and discards them. It would be
410     * nice to find a way to externalize this safely so that it could be re-used
411     * by (for example) TestPanel. It could take a Vector of Connections as an
412     * argument, instead of an HL7Service, but some problems might arise if
413     * other threads were iterating through the Vector while this one was
414     * removing elements from it.
415     * <p/>
416     * Note: this could be started as daemon, so we don't need to care about
417     * termination.
418     */
419    private static class ConnectionCleaner extends Service {
420
421        private final HL7Service service;
422
423        public ConnectionCleaner(HL7Service service) {
424            super("ConnectionCleaner", service.getExecutorService());
425            this.service = service;
426        }
427
428        @Override
429        public void start() {
430            log.info("Starting ConnectionCleaner service");
431            super.start();
432        }
433
434        public void handle() {
435            try {
436                Thread.sleep(500);
437                synchronized (service) {
438                    Iterator<Connection> it = service.getRemoteConnections()
439                            .iterator();
440                    while (it.hasNext()) {
441                        Connection conn = it.next();
442                        if (!conn.isOpen()) {
443                            log.debug(
444                                    "Removing connection from {} from connection list",
445                                    conn.getRemoteAddress().getHostAddress());
446                            it.remove();
447                            service.notifyListeners(conn);
448                        }
449                    }
450                }
451            } catch (InterruptedException e) {
452                // don't care
453            }
454        }
455
456    }
457
458}