001/** 002The contents of this file are subject to the Mozilla Public License Version 1.1 003(the "License"); you may not use this file except in compliance with the License. 004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005Software distributed under the License is distributed on an "AS IS" basis, 006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007specific language governing rights and limitations under the License. 008 009The Original Code is "ActiveConnection.java". Description: 010"A TCP/IP connection to a remote HL7 server." 011 012The Initial Developer of the Original Code is University Health Network. Copyright (C) 0132002. All Rights Reserved. 014 015Contributor(s): ______________________________________. 016 017Alternatively, the contents of this file may be used under the terms of the 018GNU General Public License (the �GPL�), in which case the provisions of the GPL are 019applicable instead of those above. If you wish to allow use of your version of this 020file only under the terms of the GPL and not to allow others to use your version 021of this file under the MPL, indicate your decision by deleting the provisions above 022and replace them with the notice and other provisions required by the GPL License. 023If you do not delete the provisions above, a recipient may use your version of 024this file under either the MPL or the GPL. 025 */ 026 027package ca.uhn.hl7v2.app; 028 029import java.io.IOException; 030import java.net.InetAddress; 031import java.net.Socket; 032import java.util.ArrayList; 033import java.util.Iterator; 034import java.util.List; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Future; 037import java.util.concurrent.TimeUnit; 038 039import javax.net.ssl.SSLSocket; 040 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import ca.uhn.hl7v2.concurrent.BlockingMap; 045import ca.uhn.hl7v2.concurrent.BlockingHashMap; 046import ca.uhn.hl7v2.concurrent.DefaultExecutorService; 047import ca.uhn.hl7v2.llp.HL7Writer; 048import ca.uhn.hl7v2.llp.LLPException; 049import ca.uhn.hl7v2.llp.LowerLayerProtocol; 050import ca.uhn.hl7v2.parser.Parser; 051 052/** 053 * A TCP/IP connection to a remote HL7 server. 054 * 055 * @author Bryan Tripp 056 */ 057public class ActiveConnection implements Connection { 058 059 private static final Logger log = LoggerFactory.getLogger(ActiveConnection.class); 060 061 private Initiator initiator; 062 private Responder responder; 063 private List<Socket> sockets; 064 private HL7Writer ackWriter; 065 private HL7Writer sendWriter; 066 private Parser parser; 067 private BlockingMap<String, String> responses; 068 private List<Receiver> receivers; 069 private boolean open = true; 070 private ExecutorService executorService; 071 072 /** 073 * Creates a new instance of Connection, with inbound and outbound 074 * communication on a single port. 075 */ 076 public ActiveConnection(Parser parser, LowerLayerProtocol llp, 077 Socket bidirectional) throws LLPException, IOException { 078 this(parser, llp, bidirectional, DefaultExecutorService 079 .getDefaultService()); 080 } 081 082 public ActiveConnection(Parser parser, LowerLayerProtocol llp, 083 Socket bidirectional, ExecutorService executorService) 084 throws LLPException, IOException { 085 init(parser, executorService, bidirectional); 086 ackWriter = llp.getWriter(bidirectional.getOutputStream()); 087 sendWriter = ackWriter; 088 this.executorService = executorService; 089 sockets.add(bidirectional); 090 receivers.add(new Receiver(this, llp.getReader(bidirectional 091 .getInputStream()))); 092 this.initiator = new ActiveInitiator(this); 093 } 094 095 /** 096 * Creates a new instance of Connection, with inbound communication on one 097 * port and outbound on another. 098 */ 099 public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound, 100 Socket outbound) throws LLPException, IOException { 101 this(parser, llp, inbound, outbound, DefaultExecutorService 102 .getDefaultService()); 103 } 104 105 /** 106 * Creates a new instance of Connection, with inbound communication on one 107 * port and outbound on another. 108 */ 109 public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound, 110 Socket outbound, ExecutorService executorService) 111 throws LLPException, IOException { 112 init(parser, executorService, inbound); 113 ackWriter = llp.getWriter(inbound.getOutputStream()); 114 sendWriter = llp.getWriter(outbound.getOutputStream()); 115 sockets.add(outbound); // always add outbound first ... see getRemoteAddress() 116 sockets.add(inbound); 117 118 receivers.add(new Receiver(this, 119 llp.getReader(inbound.getInputStream()))); 120 receivers.add(new Receiver(this, llp.getReader(outbound 121 .getInputStream()))); 122 this.initiator = new ActiveInitiator(this); 123 } 124 125 /** Common initialization tasks */ 126 private void init(Parser parser, ExecutorService executorService, Socket inboundSocket) 127 throws LLPException { 128 this.parser = parser; 129 this.executorService = executorService; 130 sockets = new ArrayList<Socket>(); 131 responses = new BlockingHashMap<String, String>(executorService); 132 receivers = new ArrayList<Receiver>(2); 133 responder = new Responder(inboundSocket); 134 } 135 136 /** 137 * Start the receiver thread(s) 138 */ 139 public void activate() { 140 if (receivers != null) { 141 for (Receiver receiver : receivers) { 142 receiver.start(); 143 } 144 } 145 } 146 147 public ExecutorService getExecutorService() { 148 return executorService; 149 } 150 151 /** 152 * Returns the address of the remote host to which this Connection is 153 * connected. If separate inbound and outbound sockets are used, the address 154 * of the outbound socket is returned (the addresses should normally be the 155 * same, but this isn't checked). 156 */ 157 public InetAddress getRemoteAddress() { 158 Socket s = sockets.get(0); 159 return s.getInetAddress(); 160 } 161 162 /** 163 * Returns the remote port on the remote host to which this Connection is 164 * connected. If separate inbound and outbound sockets are used, the port of 165 * the outbound socket is returned. 166 */ 167 public Integer getRemotePort() { 168 Socket s = sockets.get(0); 169 return s.getPort(); 170 } 171 172 /** Returns the Initiator associated with this connection */ 173 public Initiator getInitiator() { 174 return this.initiator; 175 } 176 177 /** Returns the Responder associated with this connection */ 178 public Responder getResponder() { 179 return this.responder; 180 } 181 182 public boolean isSecure() { 183 if (isOpen() && sockets.size() > 0) { 184 return (sockets.get(0) instanceof SSLSocket); 185 } else { 186 throw new IllegalStateException( 187 "Can't determine status on closed socket"); 188 } 189 } 190 191 /** 192 * Returns the HL7Writer through which unsolicited outbound messages should 193 * be sent. 194 */ 195 protected HL7Writer getSendWriter() { 196 return this.sendWriter; 197 } 198 199 /** 200 * Returns the HL7Writer through which responses to inbound messages should 201 * be sent. 202 */ 203 protected HL7Writer getAckWriter() { 204 return this.ackWriter; 205 } 206 207 public Parser getParser() { 208 return this.parser; 209 } 210 211 public String toString() { 212 StringBuilder buf = new StringBuilder(); 213 buf.append(getRemoteAddress().getHostName()); 214 buf.append(":"); 215 for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) { 216 Socket socket = iter.next(); 217 buf.append(socket.getPort()); 218 if (iter.hasNext()) 219 buf.append(","); 220 } 221 return buf.toString(); 222 } 223 224 /** 225 * Reserves a future incoming message by ack ID. When the incoming message 226 * with the given ack ID arrives, the message will be returned. 227 */ 228 protected Future<String> waitForResponse(final String messageID, 229 long timeout) throws InterruptedException { 230 return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS); 231 } 232 233 /** 234 * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread 235 * about a received response. 236 */ 237 protected boolean isRecipientWaiting(String ackID, String message) { 238 return responses.give(ackID, message); 239 } 240 241 /** Stops running Receiver threads and closes open sockets */ 242 public void close() { 243 // Mark all running receiver threads to be stopped 244 for (Receiver receiver : receivers) { 245 if (receiver.isRunning()) 246 receiver.stop(); 247 } 248 // Forces open sockets to be closed. This causes the Receiver threads to 249 // eventually terminate 250 for (Socket socket : sockets) { 251 try { 252 if (!socket.isClosed()) 253 socket.close(); 254 } catch (Exception e) { 255 log.error("Error while stopping threads and closing sockets", e); 256 } 257 } 258 259 open = false; 260 } 261 262 public boolean isOpen() { 263 return open; 264 } 265 266}