001/**
002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
008
009The Original Code is "TwoPortService.java".  Description: 
010"A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound messages." 
011
012The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0132001.  All Rights Reserved. 
014
015Contributor(s): ______________________________________. 
016
017Alternatively, the contents of this file may be used under the terms of the 
018GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019applicable instead of those above.  If you wish to allow use of your version of this 
020file only under the terms of the GPL and not to allow others to use your version 
021of this file under the MPL, indicate your decision by deleting  the provisions above 
022and replace  them with the notice and other provisions required by the GPL License.  
023If you do not delete the provisions above, a recipient may use your version of 
024this file under either the MPL or the GPL. 
025 */
026
027package ca.uhn.hl7v2.app;
028
029import java.io.File;
030import java.io.IOException;
031import java.net.Socket;
032import java.net.SocketException;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.TimeUnit;
039
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import ca.uhn.hl7v2.DefaultHapiContext;
044import ca.uhn.hl7v2.HapiContext;
045import ca.uhn.hl7v2.app.AcceptorThread.AcceptedSocket;
046import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
047import ca.uhn.hl7v2.llp.LLPException;
048import ca.uhn.hl7v2.llp.LowerLayerProtocol;
049import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
050import ca.uhn.hl7v2.parser.Parser;
051import ca.uhn.hl7v2.parser.PipeParser;
052import ca.uhn.hl7v2.util.SocketFactory;
053
054/**
055 * A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound
056 * messages. A connection is only activated when the same remote host connects
057 * to both the inbound and outbound ports.
058 * 
059 * @author Bryan Tripp
060 */
061public class TwoPortService extends HL7Service {
062
063        private static final Logger log = LoggerFactory
064                        .getLogger(TwoPortService.class);
065
066        private Map<String, AcceptedSocket> waitingForSecondSocket = new HashMap<String, AcceptedSocket>();
067        private int inboundPort;
068        private int outboundPort;
069        private boolean tls;
070        private BlockingQueue<AcceptedSocket> queue;
071        private AcceptorThread inboundAcceptor, outboundAcceptor;
072        private final HapiContext hapiContext;
073
074        public TwoPortService(int inboundPort, int outboundPort) {
075                this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort,
076                                outboundPort, false);
077        }
078
079        public TwoPortService(int inboundPort, int outboundPort, boolean tls) {
080                this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort,
081                                outboundPort, tls);
082        }
083
084        /** Creates a new instance of TwoPortService */
085        public TwoPortService(Parser parser, LowerLayerProtocol llp,
086                        int inboundPort, int outboundPort, boolean tls) {
087                this(parser, llp, inboundPort, outboundPort, tls,
088                                DefaultExecutorService.getDefaultService());
089        }
090
091        /** Creates a new instance of TwoPortService */
092        public TwoPortService(HapiContext hapiContext, 
093                        int inboundPort, int outboundPort, boolean tls) {
094                super(hapiContext);
095                this.hapiContext = hapiContext;
096                this.queue = new LinkedBlockingQueue<AcceptedSocket>();
097                this.inboundPort = inboundPort;
098                this.outboundPort = outboundPort;
099                this.tls = tls;
100                
101                if (inboundPort == outboundPort) {
102                        throw new IllegalArgumentException("Inbound port and outbound port can not be the same");
103                }
104                if (inboundPort < 1) {
105                        throw new IllegalArgumentException("Invalid inbound port");
106                }
107                if (outboundPort < 1) {
108                        throw new IllegalArgumentException("Invalid outbound port");
109                }
110                
111        }
112
113        /** Creates a new instance of TwoPortService */
114        public TwoPortService(Parser parser, LowerLayerProtocol llp,
115                        int inboundPort, int outboundPort, boolean tls,
116                        ExecutorService executorService) {
117                super(parser, llp, executorService);
118                this.hapiContext = new DefaultHapiContext();
119                this.queue = new LinkedBlockingQueue<AcceptedSocket>();
120                this.inboundPort = inboundPort;
121                this.outboundPort = outboundPort;
122                this.tls = tls;
123        }
124
125        /**
126         * Launches two threads that concurrently listen on the inboundPort and
127         * outboundPort.
128         * 
129         * @see ca.uhn.hl7v2.app.HL7Service#afterStartup()
130         */
131        @Override
132        protected void afterStartup() {
133                try {
134                        super.afterStartup();
135                        inboundAcceptor = createAcceptThread(inboundPort);
136                        outboundAcceptor = createAcceptThread(outboundPort);
137                        inboundAcceptor.start();
138                        outboundAcceptor.start();
139                        log.info("TwoPortService running on ports {} and {}", inboundPort,
140                                        outboundPort);
141                } catch (IOException e) {
142                        log.error("Could not run TwoPortService on ports {} and {}",
143                                        inboundPort, outboundPort);
144                        throw new RuntimeException(e);
145                }
146        }
147
148        /**
149         * Terminate the two acceptor threads
150         * 
151         * @see ca.uhn.hl7v2.app.HL7Service#afterTermination()
152         */
153        @Override
154        protected void afterTermination() {
155                super.afterTermination();
156                inboundAcceptor.stop();
157                outboundAcceptor.stop();
158        }
159
160        /**
161         * Polls for accepted sockets
162         */
163        protected void handle() {
164                if (inboundAcceptor.getServiceExitedWithException() != null) {
165                        setServiceExitedWithException(inboundAcceptor.getServiceExitedWithException());
166                }
167                if (outboundAcceptor.getServiceExitedWithException() != null) {
168                        setServiceExitedWithException(outboundAcceptor.getServiceExitedWithException());
169                }
170                
171                try {
172                        ActiveConnection conn = acceptConnection(queue.poll(2, TimeUnit.SECONDS));
173                        if (conn != null) {
174                                log.info("Accepted connection from "
175                                                + conn.getRemoteAddress().getHostAddress());
176                                newConnection(conn);
177                        }
178                } catch (Exception e) {
179                        log.error("Error while accepting connections: ", e);
180                }
181        }
182
183        /**
184         * Helper method that checks whether the newSocket completes a two-port
185         * connection or not. If yes, the {@link ActiveConnection} object is created and
186         * returned.
187         */
188        private ActiveConnection acceptConnection(AcceptedSocket newSocket)
189                        throws LLPException, IOException {
190                ActiveConnection conn = null;
191                if (newSocket != null) {
192                        String address = newSocket.socket.getInetAddress().getHostAddress();
193                        AcceptedSocket otherSocket = waitingForSecondSocket.remove(address);
194                        if (otherSocket != null && otherSocket.origin != newSocket.origin) {
195                                log.debug("Socket {} completes a two-port connection",
196                                                newSocket.socket);
197                                Socket in = getInboundSocket(newSocket, otherSocket);
198                                Socket out = getOutboundSocket(newSocket, otherSocket);
199                                conn = new ActiveConnection(getParser(), getLlp(), in, out,
200                                                getExecutorService());
201                        } else {
202                                log.debug(
203                                                "Registered {} Still waiting for second socket for two-port connection",
204                                                newSocket.socket);
205                                waitingForSecondSocket.put(address, newSocket);
206                        }
207                }
208                return conn;
209        }
210
211        private Socket getInboundSocket(AcceptedSocket socket1,
212                        AcceptedSocket socket2) {
213                return socket1.origin == inboundAcceptor ? socket1.socket
214                                : socket2.socket;
215        }
216
217        private Socket getOutboundSocket(AcceptedSocket socket1,
218                        AcceptedSocket socket2) {
219                return socket1.origin == outboundAcceptor ? socket1.socket
220                                : socket2.socket;
221        }
222
223        protected AcceptorThread createAcceptThread(int port)
224                        throws SocketException, IOException {
225                SocketFactory ss = this.hapiContext.getSocketFactory();
226                return new AcceptorThread(port, tls, getExecutorService(), queue, ss);
227        }
228
229        /**
230         * Run server from command line. Inbound and outbound port numbers should be
231         * provided as arguments, and a file containing a list of Applications to
232         * use can also be specified as an optional argument (as per
233         * <code>super.loadApplicationsFromFile(...)</code>). Uses the default
234         * LowerLayerProtocol.
235         */
236        public static void main(String args[]) {
237                if (args.length < 2 || args.length > 3) {
238                        System.out
239                                        .println("Usage: ca.uhn.hl7v2.app.TwoPortService inbound_port outbound_port [application_spec_file_name]");
240                        System.exit(1);
241                }
242
243                int inPort = 0;
244                int outPort = 0;
245                try {
246                        inPort = Integer.parseInt(args[0]);
247                        outPort = Integer.parseInt(args[1]);
248                } catch (NumberFormatException e) {
249                        System.err.println("One of the given ports (" + args[0] + " or "
250                                        + args[1] + ") is not an integer.");
251                        System.exit(1);
252                }
253
254                File appFile = null;
255                if (args.length == 3) {
256                        appFile = new File(args[2]);
257                }
258
259                try {
260                        TwoPortService server = new TwoPortService(inPort, outPort);
261                        if (appFile != null)
262                                server.loadApplicationsFromFile(appFile);
263                        server.start();
264                } catch (Exception e) {
265                        e.printStackTrace();
266                }
267
268        }
269
270}