Coverage Report - ca.uhn.hl7v2.app.ActiveConnection
 
Classes in this File Line Coverage Branch Coverage Complexity
ActiveConnection
81%
66/81
55%
11/20
1.65
 
 1  
 /**
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "ActiveConnection.java".  Description:
 10  
 "A TCP/IP connection to a remote HL7 server." 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2002.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): ______________________________________. 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
  */
 26  
 
 27  
 package ca.uhn.hl7v2.app;
 28  
 
 29  
 import java.io.IOException;
 30  
 import java.net.InetAddress;
 31  
 import java.net.Socket;
 32  
 import java.util.ArrayList;
 33  
 import java.util.Iterator;
 34  
 import java.util.List;
 35  
 import java.util.concurrent.ExecutorService;
 36  
 import java.util.concurrent.Future;
 37  
 import java.util.concurrent.TimeUnit;
 38  
 
 39  
 import javax.net.ssl.SSLSocket;
 40  
 
 41  
 import org.slf4j.Logger;
 42  
 import org.slf4j.LoggerFactory;
 43  
 
 44  
 import ca.uhn.hl7v2.concurrent.BlockingMap;
 45  
 import ca.uhn.hl7v2.concurrent.BlockingHashMap;
 46  
 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
 47  
 import ca.uhn.hl7v2.llp.HL7Writer;
 48  
 import ca.uhn.hl7v2.llp.LLPException;
 49  
 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
 50  
 import ca.uhn.hl7v2.parser.Parser;
 51  
 
 52  
 /**
 53  
  * A TCP/IP connection to a remote HL7 server.
 54  
  * 
 55  
  * @author Bryan Tripp
 56  
  */
 57  
 public class ActiveConnection implements Connection {
 58  
 
 59  5
         private static final Logger log = LoggerFactory.getLogger(ActiveConnection.class);
 60  
 
 61  
         private Initiator initiator;
 62  
         private Responder responder;
 63  
         private List<Socket> sockets;
 64  
         private HL7Writer ackWriter;
 65  
         private HL7Writer sendWriter;
 66  
         private Parser parser;
 67  
         private BlockingMap<String, String> responses;
 68  
         private List<Receiver> receivers;
 69  439
         private boolean open = true;
 70  
         private ExecutorService executorService;
 71  
 
 72  
         /**
 73  
          * Creates a new instance of Connection, with inbound and outbound
 74  
          * communication on a single port.
 75  
          */
 76  
         public ActiveConnection(Parser parser, LowerLayerProtocol llp,
 77  
                             Socket bidirectional) throws LLPException, IOException {
 78  40
                 this(parser, llp, bidirectional, DefaultExecutorService
 79  20
                                 .getDefaultService());
 80  20
         }
 81  
 
 82  
         public ActiveConnection(Parser parser, LowerLayerProtocol llp,
 83  
                             Socket bidirectional, ExecutorService executorService)
 84  335
                         throws LLPException, IOException {
 85  335
                 init(parser, executorService, bidirectional);
 86  335
                 ackWriter = llp.getWriter(bidirectional.getOutputStream());
 87  335
                 sendWriter = ackWriter;
 88  335
                 this.executorService = executorService;
 89  335
                 sockets.add(bidirectional);
 90  670
                 receivers.add(new Receiver(this, llp.getReader(bidirectional
 91  335
                                 .getInputStream())));
 92  335
                 this.initiator = new ActiveInitiator(this);
 93  335
         }
 94  
 
 95  
         /**
 96  
          * Creates a new instance of Connection, with inbound communication on one
 97  
          * port and outbound on another.
 98  
          */
 99  
         public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
 100  
                             Socket outbound) throws LLPException, IOException {
 101  20
                 this(parser, llp, inbound, outbound, DefaultExecutorService
 102  10
                                 .getDefaultService());
 103  10
         }
 104  
 
 105  
         /**
 106  
          * Creates a new instance of Connection, with inbound communication on one
 107  
          * port and outbound on another.
 108  
          */
 109  
         public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
 110  
                             Socket outbound, ExecutorService executorService)
 111  104
                         throws LLPException, IOException {
 112  104
                 init(parser, executorService, inbound);
 113  104
                 ackWriter = llp.getWriter(inbound.getOutputStream());
 114  104
                 sendWriter = llp.getWriter(outbound.getOutputStream());
 115  104
                 sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
 116  104
                 sockets.add(inbound);
 117  
 
 118  208
                 receivers.add(new Receiver(this,
 119  104
                                 llp.getReader(inbound.getInputStream())));
 120  208
                 receivers.add(new Receiver(this, llp.getReader(outbound
 121  104
                                 .getInputStream())));
 122  104
                 this.initiator = new ActiveInitiator(this);
 123  104
         }
 124  
 
 125  
         /** Common initialization tasks */
 126  
         private void init(Parser parser, ExecutorService executorService, Socket inboundSocket)
 127  
                         throws LLPException {
 128  439
                 this.parser = parser;
 129  439
                 this.executorService = executorService;
 130  439
                 sockets = new ArrayList<Socket>();
 131  439
                 responses = new BlockingHashMap<String, String>(executorService);
 132  439
                 receivers = new ArrayList<Receiver>(2);
 133  439
                 responder = new Responder(inboundSocket);
 134  439
         }
 135  
 
 136  
         /**
 137  
          * Start the receiver thread(s)
 138  
          */
 139  
         public void activate() {
 140  439
                 if (receivers != null) {
 141  439
                         for (Receiver receiver : receivers) {
 142  543
                                 receiver.start();
 143  543
                         }
 144  
                 }
 145  439
         }
 146  
 
 147  
         public ExecutorService getExecutorService() {
 148  543
                 return executorService;
 149  
         }
 150  
 
 151  
         /**
 152  
          * Returns the address of the remote host to which this Connection is
 153  
          * connected. If separate inbound and outbound sockets are used, the address
 154  
          * of the outbound socket is returned (the addresses should normally be the
 155  
          * same, but this isn't checked).
 156  
          */
 157  
         public InetAddress getRemoteAddress() {
 158  735
                 Socket s = sockets.get(0);
 159  735
                 return s.getInetAddress();
 160  
         }
 161  
 
 162  
         /**
 163  
          * Returns the remote port on the remote host to which this Connection is
 164  
          * connected. If separate inbound and outbound sockets are used, the port of
 165  
          * the outbound socket is returned.
 166  
          */
 167  
         public Integer getRemotePort() {
 168  509
                 Socket s = sockets.get(0);
 169  509
                 return s.getPort();
 170  
         }
 171  
 
 172  
         /** Returns the Initiator associated with this connection */
 173  
         public Initiator getInitiator() {
 174  608
                 return this.initiator;
 175  
         }
 176  
 
 177  
         /** Returns the Responder associated with this connection */
 178  
         public Responder getResponder() {
 179  867
                 return this.responder;
 180  
         }
 181  
 
 182  
         public boolean isSecure() {
 183  0
                 if (isOpen() && sockets.size() > 0) {
 184  0
                         return (sockets.get(0) instanceof SSLSocket);
 185  
                 } else {
 186  0
                         throw new IllegalStateException(
 187  
                                         "Can't determine status on closed socket");
 188  
                 }
 189  
         }
 190  
 
 191  
         /**
 192  
          * Returns the HL7Writer through which unsolicited outbound messages should
 193  
          * be sent.
 194  
          */
 195  
         protected HL7Writer getSendWriter() {
 196  588
                 return this.sendWriter;
 197  
         }
 198  
 
 199  
         /**
 200  
          * Returns the HL7Writer through which responses to inbound messages should
 201  
          * be sent.
 202  
          */
 203  
         protected HL7Writer getAckWriter() {
 204  623
                 return this.ackWriter;
 205  
         }
 206  
 
 207  
         public Parser getParser() {
 208  2377
                 return this.parser;
 209  
         }
 210  
 
 211  
         public String toString() {
 212  0
                 StringBuilder buf = new StringBuilder();
 213  0
                 buf.append(getRemoteAddress().getHostName());
 214  0
                 buf.append(":");
 215  0
                 for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
 216  0
                         Socket socket = iter.next();
 217  0
                         buf.append(socket.getPort());
 218  0
                         if (iter.hasNext())
 219  0
                                 buf.append(",");
 220  0
                 }
 221  0
                 return buf.toString();
 222  
         }
 223  
 
 224  
         /**
 225  
          * Reserves a future incoming message by ack ID. When the incoming message
 226  
          * with the given ack ID arrives, the message will be returned.
 227  
          */
 228  
         protected Future<String> waitForResponse(final String messageID,
 229  
                         long timeout) throws InterruptedException {
 230  588
                 return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
 231  
         }
 232  
 
 233  
         /**
 234  
          * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
 235  
          * about a received response.
 236  
          */
 237  
         protected boolean isRecipientWaiting(String ackID, String message) {
 238  578
                 return responses.give(ackID, message);
 239  
         }
 240  
 
 241  
         /** Stops running Receiver threads and closes open sockets */
 242  
         public void close() {
 243  
                 // Mark all running receiver threads to be stopped
 244  749
                 for (Receiver receiver : receivers) {
 245  1009
                         if (receiver.isRunning())
 246  538
                                 receiver.stop();
 247  1009
                 }
 248  
                 // Forces open sockets to be closed. This causes the Receiver threads to
 249  
                 // eventually terminate
 250  749
                 for (Socket socket : sockets) {
 251  
                         try {
 252  1009
                                 if (!socket.isClosed())
 253  552
                                         socket.close();
 254  0
                         } catch (Exception e) {
 255  0
                                 log.error("Error while stopping threads and closing sockets", e);
 256  1009
                         }
 257  1009
                 }
 258  
 
 259  749
                 open = false;
 260  749
         }
 261  
 
 262  
         public boolean isOpen() {
 263  1522
                 return open;
 264  
         }
 265  
 
 266  
 }