View Javadoc
1   /**
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "ActiveConnection.java".  Description:
10  "A TCP/IP connection to a remote HL7 server." 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2002.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25   */
26  
27  package ca.uhn.hl7v2.app;
28  
29  import java.io.IOException;
30  import java.net.InetAddress;
31  import java.net.Socket;
32  import java.util.ArrayList;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  
39  import javax.net.ssl.SSLSocket;
40  
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import ca.uhn.hl7v2.concurrent.BlockingMap;
45  import ca.uhn.hl7v2.app.Receiver.ReceiverParserExceptionHandler;
46  import ca.uhn.hl7v2.concurrent.BlockingHashMap;
47  import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
48  import ca.uhn.hl7v2.llp.HL7Writer;
49  import ca.uhn.hl7v2.llp.LLPException;
50  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
51  import ca.uhn.hl7v2.parser.Parser;
52  
53  /**
54   * A TCP/IP connection to a remote HL7 server.
55   * 
56   * @author Bryan Tripp
57   */
58  public class ActiveConnection implements Connection {
59  
60  	private static final Logger log = LoggerFactory.getLogger(ActiveConnection.class);
61  
62  	private final Initiator initiator;
63  	private Responder responder;
64  	private List<Socket> sockets;
65  	private final HL7Writer ackWriter;
66  	private final HL7Writer sendWriter;
67  	private Parser parser;
68  	private BlockingMap<String, String> responses;
69  	private List<Receiver> receivers;
70  	private boolean open = true;
71  	private ExecutorService executorService;
72  	private boolean acceptAll = false;
73  
74  	/**
75  	 * Creates a new instance of Connection, with inbound and outbound
76  	 * communication on a single port.
77  	 */
78  	public ActiveConnection(Parser parser, LowerLayerProtocol llp,
79                              Socket bidirectional) throws LLPException, IOException {
80  		this(parser, llp, bidirectional, DefaultExecutorService
81  				.getDefaultService());
82  	}
83  
84  	public ActiveConnection(Parser parser, LowerLayerProtocol llp,
85                              Socket bidirectional, ExecutorService executorService)
86  			throws LLPException, IOException {
87  		init(parser, executorService, bidirectional);
88  		ackWriter = llp.getWriter(bidirectional.getOutputStream());
89  		sendWriter = ackWriter;
90  		this.executorService = executorService;
91  		sockets.add(bidirectional);
92  		receivers.add(new Receiver(this, llp.getReader(bidirectional
93  				.getInputStream())));
94  		this.initiator = new ActiveInitiator(this);
95  	}
96  
97  	public ActiveConnection(Parser parser, LowerLayerProtocol llp,
98                              Socket bidirectional, ExecutorService executorService, 
99                              boolean acceptAllMsg)
100 			throws LLPException, IOException {
101 		this(parser, llp, bidirectional, executorService);
102 		acceptAll = acceptAllMsg;
103 	}
104 
105 	/**
106 	 * Creates a new instance of Connection, with inbound communication on one
107 	 * port and outbound on another.
108 	 */
109 	public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
110                             Socket outbound) throws LLPException, IOException {
111 		this(parser, llp, inbound, outbound, DefaultExecutorService
112 				.getDefaultService());
113 	}
114 
115 	/**
116 	 * Creates a new instance of Connection, with inbound communication on one
117 	 * port and outbound on another.
118 	 */
119 	public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
120                             Socket outbound, ExecutorService executorService)
121 			throws LLPException, IOException {
122 		init(parser, executorService, inbound);
123 		ackWriter = llp.getWriter(inbound.getOutputStream());
124 		sendWriter = llp.getWriter(outbound.getOutputStream());
125 		sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
126 		sockets.add(inbound);
127 
128 		receivers.add(new Receiver(this,
129 				llp.getReader(inbound.getInputStream())));
130 		receivers.add(new Receiver(this, llp.getReader(outbound
131 				.getInputStream())));
132 		this.initiator = new ActiveInitiator(this);
133 	}
134 
135 	/** Common initialization tasks */
136 	private void init(Parser parser, ExecutorService executorService, Socket inboundSocket) {
137 		this.parser = parser;
138 		this.executorService = executorService;
139 		sockets = new ArrayList<>();
140 		responses = new BlockingHashMap<>(executorService);
141 		receivers = new ArrayList<>(2);
142 		responder = new Responder(inboundSocket);
143 	}
144 	
145 	/**
146 	 * Register the parser exception handler for each of the receivers
147 	 */
148 	public void setReceiverParserExeptionHandler(ReceiverParserExceptionHandler parserExeptionHandler) {
149 		if (receivers != null) {
150 			for (Receiver receiver : receivers) {
151 				receiver.setParserExeptionHandler(parserExeptionHandler);
152 			}
153 		}
154 	}
155 
156 	/**
157 	 * Start the receiver thread(s)
158 	 */
159 	public void activate() {
160 		if (receivers != null) {
161 			for (Receiver receiver : receivers) {
162 				receiver.start();
163 			}
164 		}
165 	}
166 
167 	public ExecutorService getExecutorService() {
168 		return executorService;
169 	}
170 
171 	/**
172 	 * Returns the address of the remote host to which this Connection is
173 	 * connected. If separate inbound and outbound sockets are used, the address
174 	 * of the outbound socket is returned (the addresses should normally be the
175 	 * same, but this isn't checked).
176 	 */
177 	public InetAddress getRemoteAddress() {
178 		Socket s = sockets.get(0);
179 		return s.getInetAddress();
180 	}
181 
182 	/**
183 	 * Returns the remote port on the remote host to which this Connection is
184 	 * connected. If separate inbound and outbound sockets are used, the port of
185 	 * the outbound socket is returned.
186 	 */
187 	public Integer getRemotePort() {
188 		Socket s = sockets.get(0);
189 		return s.getPort();
190 	}
191 
192 	/** Returns the Initiator associated with this connection */
193 	public Initiator getInitiator() {
194 		return this.initiator;
195 	}
196 
197 	/** Returns the Responder associated with this connection */
198 	public Responder getResponder() {
199 		return this.responder;
200 	}
201 
202 	public boolean isSecure() {
203 		if (isOpen() && sockets.size() > 0) {
204 			return (sockets.get(0) instanceof SSLSocket);
205 		} else {
206 			throw new IllegalStateException(
207 					"Can't determine status on closed socket");
208 		}
209 	}
210 
211 	/**
212 	 * Returns the HL7Writer through which unsolicited outbound messages should
213 	 * be sent.
214 	 */
215 	protected HL7Writer getSendWriter() {
216 		return this.sendWriter;
217 	}
218 
219 	/**
220 	 * Returns the HL7Writer through which responses to inbound messages should
221 	 * be sent.
222 	 */
223 	protected HL7Writer getAckWriter() {
224 		return this.ackWriter;
225 	}
226 
227 	public Parser getParser() {
228 		return this.parser;
229 	}
230 
231 	public String toString() {
232 		StringBuilder buf = new StringBuilder();
233 		buf.append(getRemoteAddress().getHostName());
234 		buf.append(":");
235 		for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
236 			Socket socket = iter.next();
237 			buf.append(socket.getPort());
238 			if (iter.hasNext())
239 				buf.append(",");
240 		}
241 		return buf.toString();
242 	}
243 
244 	/**
245 	 * Reserves a future incoming message by ack ID. When the incoming message
246 	 * with the given ack ID arrives, the message will be returned.
247 	 */
248 	protected Future<String> waitForResponse(final String messageID,
249 			long timeout) throws InterruptedException {
250 		return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
251 	}
252 
253 	/**
254 	 * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
255 	 * about a received response.
256 	 */
257 	protected boolean isRecipientWaiting(String ackID, String message) {
258 		return responses.give(ackID, message);
259 	}
260 
261 	/** Stops running Receiver threads and closes open sockets */
262 	public void close() {
263 		// Mark all running receiver threads to be stopped
264 		for (Receiver receiver : receivers) {
265 			if (receiver.isRunning())
266 				receiver.stop();
267 		}
268 		// Forces open sockets to be closed. This causes the Receiver threads to
269 		// eventually terminate
270 		for (Socket socket : sockets) {
271 			try {
272 				if (!socket.isClosed())
273 					socket.close();
274 			} catch (Exception e) {
275 				log.error("Error while stopping threads and closing sockets", e);
276 			}
277 		}
278 
279 		open = false;
280 	}
281 
282 	public boolean isOpen() {
283 		return open;
284 	}
285 
286 	public boolean acceptAllMessages(){
287 		return acceptAll;
288 	}
289 
290 }