View Javadoc
1   /*
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "HL7Server.java".  Description: 
10  "A TCP/IP based server." 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2004.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25  */
26  
27  package ca.uhn.hl7v2.protocol.impl;
28  
29  import java.io.IOException;
30  import java.net.MalformedURLException;
31  import java.net.ServerSocket;
32  import java.net.URL;
33  import java.util.ArrayList;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.StringTokenizer;
37  
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import ca.uhn.hl7v2.HL7Exception;
42  import ca.uhn.hl7v2.protocol.ApplicationRouter;
43  import ca.uhn.hl7v2.protocol.Processor;
44  import ca.uhn.hl7v2.protocol.ProcessorContext;
45  import ca.uhn.hl7v2.protocol.SafeStorage;
46  import ca.uhn.hl7v2.protocol.TransportException;
47  import ca.uhn.hl7v2.protocol.TransportLayer;
48  
49  /**
50   * A TCP/IP based server.
51   *
52   * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
53   * @version $Revision: 1.2 $ updated on $Date: 2009-06-30 13:30:45 $ by $Author: jamesagnew $
54   */
55  public class HL7Server {
56  
57      private static final Logger log = LoggerFactory.getLogger(HL7Server.class);
58  
59      private final ServerSocket myServerSocket;
60      private ServerSocket myServerSocket2;
61      private final ApplicationRouter myRouter;
62      private final SafeStorage myStorage;
63  
64      private boolean myIsRunning = false;
65      private List<Processor> myProcessors;
66  
67      /**
68       * @param theServerSocket a ServerSocket on which to listen for connections that will
69       *                        be used for both locally-driven and remotely-driven message exchanges
70       * @param theRouter       used to send incoming messages to appropriate <code>Application</code>s
71       * @param theStorage      used to commit incoming messages to safe storage before returning
72       *                        an accept ACK
73       */
74      public HL7Server(ServerSocket theServerSocket, ApplicationRouter theRouter, SafeStorage theStorage) {
75          myServerSocket = theServerSocket;
76          myRouter = theRouter;
77          myStorage = theStorage;
78          initProcessorList();
79      }
80  
81      /**
82       * @param theLocallyDriven  a ServerSocket on which to listen for connections that will
83       *                          be used for locally-initiated message exchanges
84       * @param theRemotelyDriven a ServerSocket on which to listen for connections that will
85       *                          be used for remotely-initiated message exchanges
86       * @param theRouter         used to send incoming messages to appropriate <code>Application</code>s
87       * @param theStorage        used to commit incoming messages to safe storage before returning
88       *                          an accept ACK
89       */
90      public HL7Server(ServerSocket theLocallyDriven, ServerSocket theRemotelyDriven,
91                       ApplicationRouter theRouter, SafeStorage theStorage) {
92  
93          myServerSocket = theLocallyDriven;
94          myServerSocket2 = theRemotelyDriven;
95          myRouter = theRouter;
96          myStorage = theStorage;
97          initProcessorList();
98      }
99  
100     //creates list and starts thread to clean dead processors from it     
101     private void initProcessorList() {
102         myProcessors = new ArrayList<>();
103 
104         final List<Processor> processors = myProcessors;
105         Thread cleaner = new Thread(() -> {
106             try {
107                 Thread.sleep(1000);
108             } catch (InterruptedException ignored) {
109             }
110 
111             synchronized (processors) {
112                 Iterator<Processor> it = processors.iterator();
113                 while (it.hasNext()) {
114                     Processor proc = it.next();
115                     if (!proc.getContext().getLocallyDrivenTransportLayer().isConnected()
116                             || !proc.getContext().getRemotelyDrivenTransportLayer().isConnected()) {
117                         it.remove();
118                     }
119                 }
120             }
121         });
122         cleaner.start();
123     }
124 
125     /**
126      * Accepts a single inbound connection if the same ServerSocket is used for
127      * all message exchanges, or a connection from each if two ServerSockets are
128      * being used.
129      *
130      * @param theAddress the IP address from which to accept connections (null means
131      *                   accept from any address).  Connection attempts from other addresses will
132      *                   be ignored.
133      * @return a <code>Processor</code> connected to the given address
134      * @throws TransportException
135      */
136     public Processor accept(String theAddress) throws TransportException {
137         TransportLayer transport = getTransport(myServerSocket, theAddress);
138         ProcessorContext context;
139 
140         if (myServerSocket2 == null) { //we're doing inbound & outbound on the same port
141             transport.connect();
142             context = new ProcessorContextImpl(myRouter, transport, myStorage);
143         } else {
144             TransportLayer transport2 = getTransport(myServerSocket2, theAddress);
145             DualTransportConnectorector.html#DualTransportConnector">DualTransportConnector connector = new DualTransportConnector(transport, transport2);
146             connector.connect();
147 
148             context = new ProcessorContextImpl(myRouter, transport, transport2, myStorage);
149         }
150         return new ProcessorImpl(context, true);
151     }
152 
153     private static TransportLayer getTransport(ServerSocket theServerSocket, String theAddress) throws TransportException {
154         ServerSocketStreamSourceStreamSource.html#ServerSocketStreamSource">ServerSocketStreamSource ss = new ServerSocketStreamSource(theServerSocket, theAddress);
155         return new MLLPTransport(ss);
156     }
157 
158     /**
159      * Starts accepting connections in a new Thread.  Note that this can be
160      * called multiple times with separate addresses.  The stop() method ends
161      * all Threads started here.
162      *
163      * @param theAddress IP address from which connections are accepted (null
164      *                   means any address is OK)
165      */
166     public void start(final String theAddress) {
167         final HL7Server server = this;
168         Runnable acceptor = () -> {
169             while (server.isRunning()) {
170                 try {
171                     Processor p = server.accept(theAddress);
172                     if (!myIsRunning) {
173                         p.stop();
174                     } else {
175                         server.newProcessor(p);
176                         Thread.sleep(1);
177                     }
178                 } catch (TransportException e) {
179                     log.error(e.getMessage(), e);
180                 } catch (InterruptedException ignored) {
181                 }
182             }
183         };
184 
185         myIsRunning = true;
186 
187         Thread thd = new Thread(acceptor);
188         thd.start();
189     }
190 
191     private synchronized void newProcessor(Processor theProcessor) {
192         myProcessors.add(theProcessor);
193     }
194 
195     /**
196      * Stops running after the next connection is made.
197      */
198     public synchronized void stop() {
199         myIsRunning = false;
200         for (Processor next : myProcessors) {
201             next.stop();
202         }
203     }
204 
205     /**
206      * Returns <code>true</code> between when start() returns and when stop() is called.
207      * <p>
208      * Note that this is not the same as checking whether there are any active connections to
209      * this server. To determine this, call {@link #getProcessors()} and check whether the array
210      * returned is non-empty.
211      *
212      * @return true between when start() returns and when stop() is called.
213      */
214     public boolean isRunning() {
215         return myIsRunning;
216     }
217 
218     /**
219      * @return <code>Processor</code>s arising from connections to this server
220      */
221     public Processor[] getProcessors() {
222         return myProcessors.toArray(new Processor[0]);
223     }
224 
225     /**
226      * @param theUrlSpec a string specifying an URL, which can optionally begin with "classpath:"
227      * @return the resource specified after "classpath:", if that's how it starts, otherwise
228      * new URL(theUrlSpec)
229      * @throws MalformedURLException
230      */
231     private static URL getURL(String theUrlSpec) throws MalformedURLException {
232         URL url;
233         if (theUrlSpec.startsWith("classpath:")) {
234             StringTokenizer tok = new StringTokenizer(theUrlSpec, ":", false);
235             tok.nextToken();
236             String resource = tok.nextToken();
237             url = Thread.currentThread().getContextClassLoader().getResource(resource);
238         } else {
239             url = new URL(theUrlSpec);
240         }
241         return url;
242     }
243 
244     public static void main(String[] args) {
245         if (args.length < 1 || args.length > 3) {
246             System.out.println("Usage: HL7Server (shared_port | (locally_driven_port remotely_driven_port)) app_binding_URL");
247             System.exit(1);
248         }
249 
250         SafeStorage storage = new NullSafeStorage();
251         ApplicationRouter router = new ApplicationRouterImpl();
252 
253         try {
254             HL7Server server;
255             String appURL;
256             if (args.length == 2) {
257                 int port = Integer.parseInt(args[0]);
258                 server = new HL7Server(new ServerSocket(port), router, storage);
259                 appURL = args[1];
260             } else {
261                 int localPort = Integer.parseInt(args[0]);
262                 int remotePort = Integer.parseInt(args[1]);
263                 server = new HL7Server(new ServerSocket(localPort), new ServerSocket(remotePort), router, storage);
264                 appURL = args[2];
265             }
266 
267             ApplicationLoader.loadApplications(router, getURL(appURL));
268 
269             server.start(null); //any address OK            
270 
271         } catch (NumberFormatException e) {
272             System.out.println("Port arguments must be integers");
273             System.exit(2);
274         } catch (IOException e) {
275             e.printStackTrace();
276             System.exit(3);
277         } catch (HL7Exception e) {
278             e.printStackTrace();
279             System.exit(4);
280         } catch (ClassNotFoundException e) {
281             e.printStackTrace();
282             System.exit(5);
283         } catch (InstantiationException e) {
284             e.printStackTrace();
285             System.exit(6);
286         } catch (IllegalAccessException e) {
287             e.printStackTrace();
288             System.exit(7);
289         }
290 
291     }
292 }