001/** 002The contents of this file are subject to the Mozilla Public License Version 1.1 003(the "License"); you may not use this file except in compliance with the License. 004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005Software distributed under the License is distributed on an "AS IS" basis, 006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007specific language governing rights and limitations under the License. 008 009The Original Code is "TwoPortService.java". Description: 010"A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound messages." 011 012The Initial Developer of the Original Code is University Health Network. Copyright (C) 0132001. All Rights Reserved. 014 015Contributor(s): ______________________________________. 016 017Alternatively, the contents of this file may be used under the terms of the 018GNU General Public License (the �GPL�), in which case the provisions of the GPL are 019applicable instead of those above. If you wish to allow use of your version of this 020file only under the terms of the GPL and not to allow others to use your version 021of this file under the MPL, indicate your decision by deleting the provisions above 022and replace them with the notice and other provisions required by the GPL License. 023If you do not delete the provisions above, a recipient may use your version of 024this file under either the MPL or the GPL. 025 */ 026 027package ca.uhn.hl7v2.app; 028 029import java.io.File; 030import java.io.IOException; 031import java.net.Socket; 032import java.net.SocketException; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.concurrent.BlockingQueue; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.TimeUnit; 039 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import ca.uhn.hl7v2.DefaultHapiContext; 044import ca.uhn.hl7v2.HapiContext; 045import ca.uhn.hl7v2.app.AcceptorThread.AcceptedSocket; 046import ca.uhn.hl7v2.concurrent.DefaultExecutorService; 047import ca.uhn.hl7v2.llp.LLPException; 048import ca.uhn.hl7v2.llp.LowerLayerProtocol; 049import ca.uhn.hl7v2.llp.MinLowerLayerProtocol; 050import ca.uhn.hl7v2.parser.Parser; 051import ca.uhn.hl7v2.parser.PipeParser; 052import ca.uhn.hl7v2.util.SocketFactory; 053 054/** 055 * A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound 056 * messages. A connection is only activated when the same remote host connects 057 * to both the inbound and outbound ports. 058 * 059 * @author Bryan Tripp 060 */ 061public class TwoPortService extends HL7Service { 062 063 private static final Logger log = LoggerFactory 064 .getLogger(TwoPortService.class); 065 066 private Map<String, AcceptedSocket> waitingForSecondSocket = new HashMap<String, AcceptedSocket>(); 067 private int inboundPort; 068 private int outboundPort; 069 private boolean tls; 070 private BlockingQueue<AcceptedSocket> queue; 071 private AcceptorThread inboundAcceptor, outboundAcceptor; 072 private final HapiContext hapiContext; 073 074 public TwoPortService(int inboundPort, int outboundPort) { 075 this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort, 076 outboundPort, false); 077 } 078 079 public TwoPortService(int inboundPort, int outboundPort, boolean tls) { 080 this(new PipeParser(), new MinLowerLayerProtocol(), inboundPort, 081 outboundPort, tls); 082 } 083 084 /** Creates a new instance of TwoPortService */ 085 public TwoPortService(Parser parser, LowerLayerProtocol llp, 086 int inboundPort, int outboundPort, boolean tls) { 087 this(parser, llp, inboundPort, outboundPort, tls, 088 DefaultExecutorService.getDefaultService()); 089 } 090 091 /** Creates a new instance of TwoPortService */ 092 public TwoPortService(HapiContext hapiContext, 093 int inboundPort, int outboundPort, boolean tls) { 094 super(hapiContext); 095 this.hapiContext = hapiContext; 096 this.queue = new LinkedBlockingQueue<AcceptedSocket>(); 097 this.inboundPort = inboundPort; 098 this.outboundPort = outboundPort; 099 this.tls = tls; 100 101 if (inboundPort == outboundPort) { 102 throw new IllegalArgumentException("Inbound port and outbound port can not be the same"); 103 } 104 if (inboundPort < 1) { 105 throw new IllegalArgumentException("Invalid inbound port"); 106 } 107 if (outboundPort < 1) { 108 throw new IllegalArgumentException("Invalid outbound port"); 109 } 110 111 } 112 113 /** Creates a new instance of TwoPortService */ 114 public TwoPortService(Parser parser, LowerLayerProtocol llp, 115 int inboundPort, int outboundPort, boolean tls, 116 ExecutorService executorService) { 117 super(parser, llp, executorService); 118 this.hapiContext = new DefaultHapiContext(); 119 this.queue = new LinkedBlockingQueue<AcceptedSocket>(); 120 this.inboundPort = inboundPort; 121 this.outboundPort = outboundPort; 122 this.tls = tls; 123 } 124 125 /** 126 * Launches two threads that concurrently listen on the inboundPort and 127 * outboundPort. 128 * 129 * @see ca.uhn.hl7v2.app.HL7Service#afterStartup() 130 */ 131 @Override 132 protected void afterStartup() { 133 try { 134 super.afterStartup(); 135 inboundAcceptor = createAcceptThread(inboundPort); 136 outboundAcceptor = createAcceptThread(outboundPort); 137 inboundAcceptor.start(); 138 outboundAcceptor.start(); 139 log.info("TwoPortService running on ports {} and {}", inboundPort, 140 outboundPort); 141 } catch (IOException e) { 142 log.error("Could not run TwoPortService on ports {} and {}", 143 inboundPort, outboundPort); 144 throw new RuntimeException(e); 145 } 146 } 147 148 /** 149 * Terminate the two acceptor threads 150 * 151 * @see ca.uhn.hl7v2.app.HL7Service#afterTermination() 152 */ 153 @Override 154 protected void afterTermination() { 155 super.afterTermination(); 156 inboundAcceptor.stop(); 157 outboundAcceptor.stop(); 158 } 159 160 /** 161 * Polls for accepted sockets 162 */ 163 protected void handle() { 164 if (inboundAcceptor.getServiceExitedWithException() != null) { 165 setServiceExitedWithException(inboundAcceptor.getServiceExitedWithException()); 166 } 167 if (outboundAcceptor.getServiceExitedWithException() != null) { 168 setServiceExitedWithException(outboundAcceptor.getServiceExitedWithException()); 169 } 170 171 try { 172 ActiveConnection conn = acceptConnection(queue.poll(2, TimeUnit.SECONDS)); 173 if (conn != null) { 174 log.info("Accepted connection from " 175 + conn.getRemoteAddress().getHostAddress()); 176 newConnection(conn); 177 } 178 } catch (Exception e) { 179 log.error("Error while accepting connections: ", e); 180 } 181 } 182 183 /** 184 * Helper method that checks whether the newSocket completes a two-port 185 * connection or not. If yes, the {@link ActiveConnection} object is created and 186 * returned. 187 */ 188 private ActiveConnection acceptConnection(AcceptedSocket newSocket) 189 throws LLPException, IOException { 190 ActiveConnection conn = null; 191 if (newSocket != null) { 192 String address = newSocket.socket.getInetAddress().getHostAddress(); 193 AcceptedSocket otherSocket = waitingForSecondSocket.remove(address); 194 if (otherSocket != null && otherSocket.origin != newSocket.origin) { 195 log.debug("Socket {} completes a two-port connection", 196 newSocket.socket); 197 Socket in = getInboundSocket(newSocket, otherSocket); 198 Socket out = getOutboundSocket(newSocket, otherSocket); 199 conn = new ActiveConnection(getParser(), getLlp(), in, out, 200 getExecutorService()); 201 } else { 202 log.debug( 203 "Registered {} Still waiting for second socket for two-port connection", 204 newSocket.socket); 205 waitingForSecondSocket.put(address, newSocket); 206 } 207 } 208 return conn; 209 } 210 211 private Socket getInboundSocket(AcceptedSocket socket1, 212 AcceptedSocket socket2) { 213 return socket1.origin == inboundAcceptor ? socket1.socket 214 : socket2.socket; 215 } 216 217 private Socket getOutboundSocket(AcceptedSocket socket1, 218 AcceptedSocket socket2) { 219 return socket1.origin == outboundAcceptor ? socket1.socket 220 : socket2.socket; 221 } 222 223 protected AcceptorThread createAcceptThread(int port) 224 throws SocketException, IOException { 225 SocketFactory ss = this.hapiContext.getSocketFactory(); 226 return new AcceptorThread(port, tls, getExecutorService(), queue, ss); 227 } 228 229 /** 230 * Run server from command line. Inbound and outbound port numbers should be 231 * provided as arguments, and a file containing a list of Applications to 232 * use can also be specified as an optional argument (as per 233 * <code>super.loadApplicationsFromFile(...)</code>). Uses the default 234 * LowerLayerProtocol. 235 */ 236 public static void main(String args[]) { 237 if (args.length < 2 || args.length > 3) { 238 System.out 239 .println("Usage: ca.uhn.hl7v2.app.TwoPortService inbound_port outbound_port [application_spec_file_name]"); 240 System.exit(1); 241 } 242 243 int inPort = 0; 244 int outPort = 0; 245 try { 246 inPort = Integer.parseInt(args[0]); 247 outPort = Integer.parseInt(args[1]); 248 } catch (NumberFormatException e) { 249 System.err.println("One of the given ports (" + args[0] + " or " 250 + args[1] + ") is not an integer."); 251 System.exit(1); 252 } 253 254 File appFile = null; 255 if (args.length == 3) { 256 appFile = new File(args[2]); 257 } 258 259 try { 260 TwoPortService server = new TwoPortService(inPort, outPort); 261 if (appFile != null) 262 server.loadApplicationsFromFile(appFile); 263 server.start(); 264 } catch (Exception e) { 265 e.printStackTrace(); 266 } 267 268 } 269 270}