001/**
002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
008
009The Original Code is "ActiveInitiator.java".  Description:
010"Performs the initiation role of a message exchange accorging to HL7's original 
011 mode rules." 
012
013The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0142002.  All Rights Reserved. 
015
016Contributor(s): ______________________________________. 
017
018Alternatively, the contents of this file may be used under the terms of the 
019GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
020applicable instead of those above.  If you wish to allow use of your version of this 
021file only under the terms of the GPL and not to allow others to use your version 
022of this file under the MPL, indicate your decision by deleting  the provisions above 
023and replace  them with the notice and other provisions required by the GPL License.  
024If you do not delete the provisions above, a recipient may use your version of 
025this file under either the MPL or the GPL. 
026
027 */
028
029package ca.uhn.hl7v2.app;
030
031import java.io.IOException;
032import java.net.Socket;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Future;
035import java.util.concurrent.TimeUnit;
036
037import ca.uhn.hl7v2.ErrorCode;
038import ca.uhn.hl7v2.HL7Exception;
039import ca.uhn.hl7v2.llp.LLPException;
040import ca.uhn.hl7v2.llp.LowerLayerProtocol;
041import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
042import ca.uhn.hl7v2.model.Message;
043import ca.uhn.hl7v2.parser.Parser;
044import ca.uhn.hl7v2.parser.PipeParser;
045import ca.uhn.hl7v2.util.Terser;
046import ca.uhn.hl7v2.util.idgenerator.IDGenerator;
047import ca.uhn.hl7v2.util.idgenerator.InMemoryIDGenerator;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * <p>
053 * Performs the initiation role of a message exchange (i.e sender of the first
054 * message; analogous to the client in a client-server interaction), according
055 * to HL7's original mode processing rules.
056 * </p>
057 * <p>
058 * The <code>sendAndReceive(...)</code> method blocks until either a response is
059 * received with the matching message ID, or until a timeout period has passed.
060 * The timeout defaults to 10000 ms (10 sec) but can be configured using
061 * {@link #setTimeout(long, java.util.concurrent.TimeUnit)} or globally by setting
062 * the system property "ca.uhn.hl7v2.app.initiator.timeout" to an long value
063 * representing the number of ms after which to time out.
064 * </p>
065 * <p>
066 * At the time of writing, enhanced mode, two-phase reply, continuation
067 * messages, and batch processing are unsupported.
068 * </p>
069 * 
070 * @author Bryan Tripp
071 */
072public class ActiveInitiator implements Initiator {
073
074        private static final Logger log = LoggerFactory.getLogger(ActiveInitiator.class);
075        private static final Logger rawOutbound = LoggerFactory
076                        .getLogger("ca.uhn.hl7v2.raw.outbound");
077        private static final Logger rawInbound = LoggerFactory
078                        .getLogger("ca.uhn.hl7v2.raw.inbound");
079        private ActiveConnection conn;
080        private long timeoutMillis = 10000;
081
082        /**
083         * Creates a new instance of ActiveInitiator.
084         * 
085         * @param conn
086         *            the Connection associated with this ActiveInitiator.
087         */
088        ActiveInitiator(ActiveConnection conn) throws LLPException {
089                this.conn = conn;
090
091                // See if timeout has been set
092                String timeout = System
093                                .getProperty("ca.uhn.hl7v2.app.initiator.timeout");
094                if (timeout != null) {
095                        try {
096                                timeoutMillis = Long.parseLong(timeout);
097                                log.debug("Setting Initiator timeout to {} ms", timeout);
098                        } catch (NumberFormatException e) {
099                                log.warn(timeout
100                                                + " is not a valid long - Initiator is using default timeout");
101                        }
102                }
103        }
104
105        /**
106         * Sends a message to a responder system, receives the reply, and returns
107         * the reply as a Message object. This method is thread-safe - multiple
108         * threads can share an Initiator and call this method. Responses are
109         * returned to the calling thread on the basis of message ID.
110         */
111        public Message sendAndReceive(Message out) throws HL7Exception,
112                        LLPException, IOException {
113                if (out == null) {
114                        throw new HL7Exception("Can't encode null message",
115                                        ErrorCode.REQUIRED_FIELD_MISSING);
116                }
117
118                // register message with response Receiver(s) (by message ID)
119                Terser t = new Terser(out);
120                String messID = t.get("/MSH-10");
121
122                if (messID == null || messID.length() == 0) {
123                        throw new HL7Exception(
124                                        "MSH segment missing required field Control ID (MSH-10)",
125                                        ErrorCode.REQUIRED_FIELD_MISSING);
126                }
127
128                // log and send message
129                String outbound = conn.getParser().encode(out);
130                rawOutbound.debug(outbound);
131                Future<String> inbound = null;
132                try {
133                        String message;
134                        inbound = conn.waitForResponse(messID, timeoutMillis);
135                        conn.getSendWriter().writeMessage(outbound);
136                        if (inbound != null && (message = inbound.get()) != null) {
137                                // log that we got the message
138                                log.debug("Initiator received message: {}", message);
139                                rawInbound.debug(message);
140                                Message response = conn.getParser().parse(message);
141                                log.debug("response parsed");
142                                return response;
143                        }
144                } catch (IOException e) {
145                        if (inbound != null)
146                                inbound.cancel(true);
147                        conn.close();
148                        throw e;
149                } catch (InterruptedException e) {
150                } catch (ExecutionException e) {
151                }
152
153                throw new TimeoutException("Timeout waiting for response to message with control ID "
154                                                + messID + " after " + timeoutMillis + " ms.");
155        }
156
157        /**
158         * Sets the timeout to wait for a response from the server
159         *
160         * @param timeout time in milliseconds
161         */
162        public void setTimeoutMillis(int timeout) {
163                setTimeout(timeout, TimeUnit.MILLISECONDS);
164        }
165
166        /**
167         *
168         * Sets the timeout to wait for a response from the server
169         *
170         * @param timeout  time duration
171         * @param timeUnit time unit
172         */
173    public void setTimeout(long timeout, TimeUnit timeUnit) {
174        this.timeoutMillis = timeUnit.toMillis(timeout);
175    }
176
177    /**
178         * Test harness
179         */
180        public static void main(String args[]) {
181                if (args.length != 2) {
182                        System.out.println("Usage: ca.uhn.hl7v2.app.ActiveInitiator host port");
183                }
184
185                try {
186
187                        // set up connection to server
188                        String host = args[0];
189                        int port = Integer.parseInt(args[1]);
190
191                        final Parser parser = new PipeParser();
192                        LowerLayerProtocol llp = new MinLowerLayerProtocol();
193                        Connection connection = new ActiveConnection(parser, llp, new Socket(
194                                        host, port));
195                        final Initiator initiator = connection.getInitiator();
196                        connection.activate();
197                        final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA";
198                        final IDGenerator generator = new InMemoryIDGenerator();
199
200                        // get a bunch of threads to send messages
201                        for (int i = 0; i < 1000; i++) {
202                                Thread sender = new Thread(new Runnable() {
203                                        
204                                        public void run() {
205                                                try {
206                                                        // get message ID
207                                                        String ID = generator.getID();
208                                                        Message out = parser.parse(outText);
209                                                        Terser tOut = new Terser(out);
210                                                        tOut.set("/MSH-10", ID);
211
212                                                        // send, get response
213                                                        Message in = initiator.sendAndReceive(out);
214                                                        // get ACK ID
215                                                        Terser tIn = new Terser(in);
216                                                        String ackID = tIn.get("/MSA-2");
217                                                        if (ID.equals(ackID)) {
218                                                                System.out.println("OK - ack ID matches");
219                                                        } else {
220                                                                throw new RuntimeException(
221                                                                                "Ack ID for message " + ID + " is "
222                                                                                                + ackID);
223                                                        }
224
225                                                } catch (Exception e) {
226                                                        e.printStackTrace();
227                                                }
228                                        }
229                                });
230                                sender.start();
231                        }
232
233                } catch (Exception e) {
234                        e.printStackTrace();
235                }
236        }
237
238}