001/**
002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
008
009The Original Code is "ActiveConnection.java".  Description:
010"A TCP/IP connection to a remote HL7 server." 
011
012The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0132002.  All Rights Reserved. 
014
015Contributor(s): ______________________________________. 
016
017Alternatively, the contents of this file may be used under the terms of the 
018GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019applicable instead of those above.  If you wish to allow use of your version of this 
020file only under the terms of the GPL and not to allow others to use your version 
021of this file under the MPL, indicate your decision by deleting  the provisions above 
022and replace  them with the notice and other provisions required by the GPL License.  
023If you do not delete the provisions above, a recipient may use your version of 
024this file under either the MPL or the GPL. 
025 */
026
027package ca.uhn.hl7v2.app;
028
029import java.io.IOException;
030import java.net.InetAddress;
031import java.net.Socket;
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.List;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Future;
037import java.util.concurrent.TimeUnit;
038
039import javax.net.ssl.SSLSocket;
040
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import ca.uhn.hl7v2.concurrent.BlockingMap;
045import ca.uhn.hl7v2.concurrent.BlockingHashMap;
046import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
047import ca.uhn.hl7v2.llp.HL7Writer;
048import ca.uhn.hl7v2.llp.LLPException;
049import ca.uhn.hl7v2.llp.LowerLayerProtocol;
050import ca.uhn.hl7v2.parser.Parser;
051
052/**
053 * A TCP/IP connection to a remote HL7 server.
054 * 
055 * @author Bryan Tripp
056 */
057public class ActiveConnection implements Connection {
058
059        private static final Logger log = LoggerFactory.getLogger(ActiveConnection.class);
060
061        private Initiator initiator;
062        private Responder responder;
063        private List<Socket> sockets;
064        private HL7Writer ackWriter;
065        private HL7Writer sendWriter;
066        private Parser parser;
067        private BlockingMap<String, String> responses;
068        private List<Receiver> receivers;
069        private boolean open = true;
070        private ExecutorService executorService;
071
072        /**
073         * Creates a new instance of Connection, with inbound and outbound
074         * communication on a single port.
075         */
076        public ActiveConnection(Parser parser, LowerLayerProtocol llp,
077                            Socket bidirectional) throws LLPException, IOException {
078                this(parser, llp, bidirectional, DefaultExecutorService
079                                .getDefaultService());
080        }
081
082        public ActiveConnection(Parser parser, LowerLayerProtocol llp,
083                            Socket bidirectional, ExecutorService executorService)
084                        throws LLPException, IOException {
085                init(parser, executorService, bidirectional);
086                ackWriter = llp.getWriter(bidirectional.getOutputStream());
087                sendWriter = ackWriter;
088                this.executorService = executorService;
089                sockets.add(bidirectional);
090                receivers.add(new Receiver(this, llp.getReader(bidirectional
091                                .getInputStream())));
092                this.initiator = new ActiveInitiator(this);
093        }
094
095        /**
096         * Creates a new instance of Connection, with inbound communication on one
097         * port and outbound on another.
098         */
099        public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
100                            Socket outbound) throws LLPException, IOException {
101                this(parser, llp, inbound, outbound, DefaultExecutorService
102                                .getDefaultService());
103        }
104
105        /**
106         * Creates a new instance of Connection, with inbound communication on one
107         * port and outbound on another.
108         */
109        public ActiveConnection(Parser parser, LowerLayerProtocol llp, Socket inbound,
110                            Socket outbound, ExecutorService executorService)
111                        throws LLPException, IOException {
112                init(parser, executorService, inbound);
113                ackWriter = llp.getWriter(inbound.getOutputStream());
114                sendWriter = llp.getWriter(outbound.getOutputStream());
115                sockets.add(outbound); // always add outbound first ... see getRemoteAddress()
116                sockets.add(inbound);
117
118                receivers.add(new Receiver(this,
119                                llp.getReader(inbound.getInputStream())));
120                receivers.add(new Receiver(this, llp.getReader(outbound
121                                .getInputStream())));
122                this.initiator = new ActiveInitiator(this);
123        }
124
125        /** Common initialization tasks */
126        private void init(Parser parser, ExecutorService executorService, Socket inboundSocket)
127                        throws LLPException {
128                this.parser = parser;
129                this.executorService = executorService;
130                sockets = new ArrayList<Socket>();
131                responses = new BlockingHashMap<String, String>(executorService);
132                receivers = new ArrayList<Receiver>(2);
133                responder = new Responder(inboundSocket);
134        }
135
136        /**
137         * Start the receiver thread(s)
138         */
139        public void activate() {
140                if (receivers != null) {
141                        for (Receiver receiver : receivers) {
142                                receiver.start();
143                        }
144                }
145        }
146
147        public ExecutorService getExecutorService() {
148                return executorService;
149        }
150
151        /**
152         * Returns the address of the remote host to which this Connection is
153         * connected. If separate inbound and outbound sockets are used, the address
154         * of the outbound socket is returned (the addresses should normally be the
155         * same, but this isn't checked).
156         */
157        public InetAddress getRemoteAddress() {
158                Socket s = sockets.get(0);
159                return s.getInetAddress();
160        }
161
162        /**
163         * Returns the remote port on the remote host to which this Connection is
164         * connected. If separate inbound and outbound sockets are used, the port of
165         * the outbound socket is returned.
166         */
167        public Integer getRemotePort() {
168                Socket s = sockets.get(0);
169                return s.getPort();
170        }
171
172        /** Returns the Initiator associated with this connection */
173        public Initiator getInitiator() {
174                return this.initiator;
175        }
176
177        /** Returns the Responder associated with this connection */
178        public Responder getResponder() {
179                return this.responder;
180        }
181
182        public boolean isSecure() {
183                if (isOpen() && sockets.size() > 0) {
184                        return (sockets.get(0) instanceof SSLSocket);
185                } else {
186                        throw new IllegalStateException(
187                                        "Can't determine status on closed socket");
188                }
189        }
190
191        /**
192         * Returns the HL7Writer through which unsolicited outbound messages should
193         * be sent.
194         */
195        protected HL7Writer getSendWriter() {
196                return this.sendWriter;
197        }
198
199        /**
200         * Returns the HL7Writer through which responses to inbound messages should
201         * be sent.
202         */
203        protected HL7Writer getAckWriter() {
204                return this.ackWriter;
205        }
206
207        public Parser getParser() {
208                return this.parser;
209        }
210
211        public String toString() {
212                StringBuilder buf = new StringBuilder();
213                buf.append(getRemoteAddress().getHostName());
214                buf.append(":");
215                for (Iterator<Socket> iter = sockets.iterator(); iter.hasNext();) {
216                        Socket socket = iter.next();
217                        buf.append(socket.getPort());
218                        if (iter.hasNext())
219                                buf.append(",");
220                }
221                return buf.toString();
222        }
223
224        /**
225         * Reserves a future incoming message by ack ID. When the incoming message
226         * with the given ack ID arrives, the message will be returned.
227         */
228        protected Future<String> waitForResponse(final String messageID,
229                        long timeout) throws InterruptedException {
230                return responses.asyncPoll(messageID, timeout, TimeUnit.MILLISECONDS);
231        }
232
233        /**
234         * Given the ack ID (MSA-2) of a message, notifies a waiting consumer thread
235         * about a received response.
236         */
237        protected boolean isRecipientWaiting(String ackID, String message) {
238                return responses.give(ackID, message);
239        }
240
241        /** Stops running Receiver threads and closes open sockets */
242        public void close() {
243                // Mark all running receiver threads to be stopped
244                for (Receiver receiver : receivers) {
245                        if (receiver.isRunning())
246                                receiver.stop();
247                }
248                // Forces open sockets to be closed. This causes the Receiver threads to
249                // eventually terminate
250                for (Socket socket : sockets) {
251                        try {
252                                if (!socket.isClosed())
253                                        socket.close();
254                        } catch (Exception e) {
255                                log.error("Error while stopping threads and closing sockets", e);
256                        }
257                }
258
259                open = false;
260        }
261
262        public boolean isOpen() {
263                return open;
264        }
265
266}