001/*
002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
008
009The Original Code is "HL7Server.java".  Description: 
010"A TCP/IP based server." 
011
012The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0132004.  All Rights Reserved. 
014
015Contributor(s): ______________________________________. 
016
017Alternatively, the contents of this file may be used under the terms of the 
018GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019applicable instead of those above.  If you wish to allow use of your version of this 
020file only under the terms of the GPL and not to allow others to use your version 
021of this file under the MPL, indicate your decision by deleting  the provisions above 
022and replace  them with the notice and other provisions required by the GPL License.  
023If you do not delete the provisions above, a recipient may use your version of 
024this file under either the MPL or the GPL. 
025*/
026
027package ca.uhn.hl7v2.protocol.impl;
028
029import java.io.IOException;
030import java.net.MalformedURLException;
031import java.net.ServerSocket;
032import java.net.URL;
033import java.util.ArrayList;
034import java.util.Iterator;
035import java.util.List;
036import java.util.StringTokenizer;
037
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import ca.uhn.hl7v2.HL7Exception;
042import ca.uhn.hl7v2.protocol.ApplicationRouter;
043import ca.uhn.hl7v2.protocol.Processor;
044import ca.uhn.hl7v2.protocol.ProcessorContext;
045import ca.uhn.hl7v2.protocol.SafeStorage;
046import ca.uhn.hl7v2.protocol.TransportException;
047import ca.uhn.hl7v2.protocol.TransportLayer;
048
049/**
050 * A TCP/IP based server. 
051 * 
052 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
053 * @version $Revision: 1.2 $ updated on $Date: 2009-06-30 13:30:45 $ by $Author: jamesagnew $
054 */
055public class HL7Server {
056
057    private static Logger log = LoggerFactory.getLogger(HL7Server.class);
058    
059    private final ServerSocket myServerSocket;
060    private ServerSocket myServerSocket2;
061    private final ApplicationRouter myRouter;
062    private final SafeStorage myStorage;
063    
064    private boolean myIsRunning = false;
065    private List<Processor> myProcessors;
066    
067    /**
068     * @param theServerSocket a ServerSocket on which to listen for connections that will
069     *      be used for both locally-driven and remotely-driven message exchanges
070     * @param theRouter used to send incoming messages to appropriate <code>Application</code>s
071     * @param theStorage used to commit incoming messages to safe storage before returning 
072     *      an accept ACK 
073     */
074    public HL7Server(ServerSocket theServerSocket, ApplicationRouter theRouter, SafeStorage theStorage) {
075        myServerSocket = theServerSocket;
076        myRouter = theRouter;
077        myStorage = theStorage;
078        initProcessorList();
079    }
080    
081    /**
082     * @param theLocallyDriven a ServerSocket on which to listen for connections that will
083     *      be used for locally-initiated message exchanges
084     * @param theRemotelyDriven a ServerSocket on which to listen for connections that will
085     *      be used for remotely-initiated message exchanges
086     * @param theRouter used to send incoming messages to appropriate <code>Application</code>s
087     * @param theStorage used to commit incoming messages to safe storage before returning 
088     *      an accept ACK 
089     */
090    public HL7Server(ServerSocket theLocallyDriven, ServerSocket theRemotelyDriven, 
091        ApplicationRouter theRouter, SafeStorage theStorage) {
092    
093        myServerSocket = theLocallyDriven;
094        myServerSocket2 = theRemotelyDriven;
095        myRouter = theRouter;
096        myStorage = theStorage;  
097        initProcessorList();       
098    }
099
100    //creates list and starts thread to clean dead processors from it     
101    private void initProcessorList() {
102        myProcessors = new ArrayList<Processor>();
103        
104        final List<Processor> processors = myProcessors; 
105        Thread cleaner = new Thread() {
106            public void run() {
107                try {
108                    Thread.sleep(1000);
109                } catch (InterruptedException e) {}
110                
111                synchronized (processors) {
112                    Iterator<Processor> it = processors.iterator();
113                    while (it.hasNext()) {
114                        Processor proc = it.next();
115                        if (!proc.getContext().getLocallyDrivenTransportLayer().isConnected() 
116                                || !proc.getContext().getRemotelyDrivenTransportLayer().isConnected()) {
117                            it.remove();
118                        }
119                    }
120                }
121            }
122        };
123        cleaner.start();
124    }
125        
126    /**
127     * Accepts a single inbound connection if the same ServerSocket is used for 
128     * all message exchanges, or a connection from each if two ServerSockets are 
129     * being used. 
130     *  
131     * @param theAddress the IP address from which to accept connections (null means 
132     *      accept from any address).  Connection attempts from other addresses will 
133     *      be ignored.  
134     * @return a <code>Processor</code> connected to the given address  
135     * @throws TransportException
136     */
137    public Processor accept(String theAddress) throws TransportException {
138        TransportLayer transport = getTransport(myServerSocket, theAddress);
139        ProcessorContext context = null;
140        
141        if (myServerSocket2 == null) { //we're doing inbound & outbound on the same port
142            transport.connect();
143            context = new ProcessorContextImpl(myRouter, transport, myStorage);
144        } else {
145            TransportLayer transport2 = getTransport(myServerSocket2, theAddress);
146            DualTransportConnector connector = new DualTransportConnector(transport, transport2);
147            connector.connect();
148            
149            context = new ProcessorContextImpl(myRouter, transport, transport2, myStorage);
150        }
151        return new ProcessorImpl(context, true);        
152    }
153    
154    private static TransportLayer getTransport(ServerSocket theServerSocket, String theAddress) throws TransportException {
155        ServerSocketStreamSource ss = new ServerSocketStreamSource(theServerSocket, theAddress);
156        return new MLLPTransport(ss);
157    }
158    
159    /**
160     * Starts accepting connections in a new Thread.  Note that this can be 
161     * called multiple times with separate addresses.  The stop() method ends
162     * all Threads started here.  
163     * 
164     * @param theAddress IP address from which connections are accepted (null 
165     *  means any address is OK) 
166     */
167    public void start(final String theAddress) {  
168        final HL7Server server = this;      
169        Runnable acceptor = new Runnable() {
170            public void run() {
171                while (server.isRunning()) {
172                    try {
173                        Processor p = server.accept(theAddress);
174                        if (!myIsRunning) {
175                                p.stop();
176                        } else {
177                            server.newProcessor(p); 
178                            Thread.sleep(1);
179                        }
180                    } catch (TransportException e) {
181                        log.error(e.getMessage(), e);
182                    } catch (InterruptedException e) {
183                    } 
184                }
185            }
186        };
187        
188        myIsRunning = true;
189        
190        Thread thd = new Thread(acceptor);
191        thd.start();
192    }
193    
194    private void newProcessor(Processor theProcessor) {
195        synchronized (myProcessors) {
196            myProcessors.add(theProcessor);
197        }
198    }
199    
200    /**
201     * Stops running after the next connection is made. 
202     */
203    public void stop() {
204        myIsRunning = false;
205        synchronized (myProcessors) {
206            for (Processor next : myProcessors) {
207                next.stop();
208            }
209        }
210    }
211    
212    /**
213     * Returns <code>true</code> between when start() returns and when stop() is called.
214     * 
215     * Note that this is not the same as checking whether there are any active connections to
216     * this server. To determine this, call {@link #getProcessors()} and check whether the array
217     * returned is non-empty.
218     * 
219     * @return true between when start() returns and when stop() is called.  
220     */
221    public boolean isRunning() {
222        return myIsRunning;
223    }
224    
225    /**
226     * @return <code>Processor</code>s arising from connections to this server 
227     */
228    public Processor[] getProcessors() {
229        synchronized (myProcessors) {
230            return (Processor[]) myProcessors.toArray(new Processor[0]);
231        }
232    }
233    
234    /**
235     * 
236     * @param theUrlSpec a string specifying an URL, which can optionally begin with "classpath:" 
237     * @return the resource specified after "classpath:", if that's how it starts, otherwise 
238     *      new URL(theUrlSpec) 
239     * @throws MalformedURLException
240     */
241    private static URL getURL(String theUrlSpec) throws MalformedURLException {
242        URL url = null;
243        if (theUrlSpec.startsWith("classpath:")) {
244            StringTokenizer tok = new StringTokenizer(theUrlSpec, ":", false);
245            tok.nextToken();
246            String resource = tok.nextToken();
247            url = Thread.currentThread().getContextClassLoader().getResource(resource);
248        } else {
249            url = new URL(theUrlSpec);
250        }
251        return url;
252    }
253    
254    public static void main(String[] args) {
255        if (args.length < 1 || args.length > 3) {
256            System.out.println("Usage: HL7Server (shared_port | (locally_driven_port remotely_driven_port)) app_binding_URL");
257            System.exit(1);
258        }
259        
260        SafeStorage storage = new NullSafeStorage();
261        ApplicationRouter router = new ApplicationRouterImpl();
262        
263        try {
264            HL7Server server = null;
265            String appURL = null;
266            if (args.length == 2) {
267                int port = Integer.parseInt(args[0]);
268                server = new HL7Server(new ServerSocket(port), router, storage);
269                appURL = args[1];                
270            } else {
271                int localPort = Integer.parseInt(args[0]);
272                int remotePort = Integer.parseInt(args[1]);
273                server = new HL7Server(new ServerSocket(localPort), new ServerSocket(remotePort), router, storage);
274                appURL = args[2];
275            }
276            
277            ApplicationLoader.loadApplications(router, getURL(appURL));
278            
279            server.start(null); //any address OK            
280            
281        } catch (NumberFormatException e) {
282            System.out.println("Port arguments must be integers");
283            System.exit(2);
284        } catch (IOException e) {
285            e.printStackTrace();
286            System.exit(3);
287        } catch (HL7Exception e) {
288            e.printStackTrace();
289            System.exit(4);
290        } catch (ClassNotFoundException e) {
291            e.printStackTrace();
292            System.exit(5);
293        } catch (InstantiationException e) {
294            e.printStackTrace();
295            System.exit(6);
296        } catch (IllegalAccessException e) {
297            e.printStackTrace();
298            System.exit(7);
299        } 
300
301    }
302}