001/** 002The contents of this file are subject to the Mozilla Public License Version 1.1 003(the "License"); you may not use this file except in compliance with the License. 004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005Software distributed under the License is distributed on an "AS IS" basis, 006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007specific language governing rights and limitations under the License. 008 009The Original Code is "ActiveInitiator.java". Description: 010"Performs the initiation role of a message exchange accorging to HL7's original 011 mode rules." 012 013The Initial Developer of the Original Code is University Health Network. Copyright (C) 0142002. All Rights Reserved. 015 016Contributor(s): ______________________________________. 017 018Alternatively, the contents of this file may be used under the terms of the 019GNU General Public License (the �GPL�), in which case the provisions of the GPL are 020applicable instead of those above. If you wish to allow use of your version of this 021file only under the terms of the GPL and not to allow others to use your version 022of this file under the MPL, indicate your decision by deleting the provisions above 023and replace them with the notice and other provisions required by the GPL License. 024If you do not delete the provisions above, a recipient may use your version of 025this file under either the MPL or the GPL. 026 027 */ 028 029package ca.uhn.hl7v2.app; 030 031import java.io.IOException; 032import java.net.Socket; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Future; 035import java.util.concurrent.TimeUnit; 036 037import ca.uhn.hl7v2.ErrorCode; 038import ca.uhn.hl7v2.HL7Exception; 039import ca.uhn.hl7v2.llp.LLPException; 040import ca.uhn.hl7v2.llp.LowerLayerProtocol; 041import ca.uhn.hl7v2.llp.MinLowerLayerProtocol; 042import ca.uhn.hl7v2.model.Message; 043import ca.uhn.hl7v2.parser.Parser; 044import ca.uhn.hl7v2.parser.PipeParser; 045import ca.uhn.hl7v2.util.Terser; 046import ca.uhn.hl7v2.util.idgenerator.IDGenerator; 047import ca.uhn.hl7v2.util.idgenerator.InMemoryIDGenerator; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * <p> 053 * Performs the initiation role of a message exchange (i.e sender of the first 054 * message; analogous to the client in a client-server interaction), according 055 * to HL7's original mode processing rules. 056 * </p> 057 * <p> 058 * The <code>sendAndReceive(...)</code> method blocks until either a response is 059 * received with the matching message ID, or until a timeout period has passed. 060 * The timeout defaults to 10000 ms (10 sec) but can be configured using 061 * {@link #setTimeout(long, java.util.concurrent.TimeUnit)} or globally by setting 062 * the system property "ca.uhn.hl7v2.app.initiator.timeout" to an long value 063 * representing the number of ms after which to time out. 064 * </p> 065 * <p> 066 * At the time of writing, enhanced mode, two-phase reply, continuation 067 * messages, and batch processing are unsupported. 068 * </p> 069 * 070 * @author Bryan Tripp 071 */ 072public class ActiveInitiator implements Initiator { 073 074 private static final Logger log = LoggerFactory.getLogger(ActiveInitiator.class); 075 private static final Logger rawOutbound = LoggerFactory 076 .getLogger("ca.uhn.hl7v2.raw.outbound"); 077 private static final Logger rawInbound = LoggerFactory 078 .getLogger("ca.uhn.hl7v2.raw.inbound"); 079 private ActiveConnection conn; 080 private long timeoutMillis = 10000; 081 082 /** 083 * Creates a new instance of ActiveInitiator. 084 * 085 * @param conn 086 * the Connection associated with this ActiveInitiator. 087 */ 088 ActiveInitiator(ActiveConnection conn) throws LLPException { 089 this.conn = conn; 090 091 // See if timeout has been set 092 String timeout = System 093 .getProperty("ca.uhn.hl7v2.app.initiator.timeout"); 094 if (timeout != null) { 095 try { 096 timeoutMillis = Long.parseLong(timeout); 097 log.debug("Setting Initiator timeout to {} ms", timeout); 098 } catch (NumberFormatException e) { 099 log.warn(timeout 100 + " is not a valid long - Initiator is using default timeout"); 101 } 102 } 103 } 104 105 /** 106 * Sends a message to a responder system, receives the reply, and returns 107 * the reply as a Message object. This method is thread-safe - multiple 108 * threads can share an Initiator and call this method. Responses are 109 * returned to the calling thread on the basis of message ID. 110 */ 111 public Message sendAndReceive(Message out) throws HL7Exception, 112 LLPException, IOException { 113 if (out == null) { 114 throw new HL7Exception("Can't encode null message", 115 ErrorCode.REQUIRED_FIELD_MISSING); 116 } 117 118 // register message with response Receiver(s) (by message ID) 119 Terser t = new Terser(out); 120 String messID = t.get("/MSH-10"); 121 122 if (messID == null || messID.length() == 0) { 123 throw new HL7Exception( 124 "MSH segment missing required field Control ID (MSH-10)", 125 ErrorCode.REQUIRED_FIELD_MISSING); 126 } 127 128 // log and send message 129 String outbound = conn.getParser().encode(out); 130 rawOutbound.debug(outbound); 131 Future<String> inbound = null; 132 try { 133 String message; 134 inbound = conn.waitForResponse(messID, timeoutMillis); 135 conn.getSendWriter().writeMessage(outbound); 136 if (inbound != null && (message = inbound.get()) != null) { 137 // log that we got the message 138 log.debug("Initiator received message: {}", message); 139 rawInbound.debug(message); 140 Message response = conn.getParser().parse(message); 141 log.debug("response parsed"); 142 return response; 143 } 144 } catch (IOException e) { 145 if (inbound != null) 146 inbound.cancel(true); 147 conn.close(); 148 throw e; 149 } catch (InterruptedException e) { 150 } catch (ExecutionException e) { 151 } 152 153 throw new TimeoutException("Timeout waiting for response to message with control ID " 154 + messID + " after " + timeoutMillis + " ms."); 155 } 156 157 /** 158 * Sets the timeout to wait for a response from the server 159 * 160 * @param timeout time in milliseconds 161 */ 162 public void setTimeoutMillis(int timeout) { 163 setTimeout(timeout, TimeUnit.MILLISECONDS); 164 } 165 166 /** 167 * 168 * Sets the timeout to wait for a response from the server 169 * 170 * @param timeout time duration 171 * @param timeUnit time unit 172 */ 173 public void setTimeout(long timeout, TimeUnit timeUnit) { 174 this.timeoutMillis = timeUnit.toMillis(timeout); 175 } 176 177 /** 178 * Test harness 179 */ 180 public static void main(String args[]) { 181 if (args.length != 2) { 182 System.out.println("Usage: ca.uhn.hl7v2.app.ActiveInitiator host port"); 183 } 184 185 try { 186 187 // set up connection to server 188 String host = args[0]; 189 int port = Integer.parseInt(args[1]); 190 191 final Parser parser = new PipeParser(); 192 LowerLayerProtocol llp = new MinLowerLayerProtocol(); 193 Connection connection = new ActiveConnection(parser, llp, new Socket( 194 host, port)); 195 final Initiator initiator = connection.getInitiator(); 196 connection.activate(); 197 final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA"; 198 final IDGenerator generator = new InMemoryIDGenerator(); 199 200 // get a bunch of threads to send messages 201 for (int i = 0; i < 1000; i++) { 202 Thread sender = new Thread(new Runnable() { 203 204 public void run() { 205 try { 206 // get message ID 207 String ID = generator.getID(); 208 Message out = parser.parse(outText); 209 Terser tOut = new Terser(out); 210 tOut.set("/MSH-10", ID); 211 212 // send, get response 213 Message in = initiator.sendAndReceive(out); 214 // get ACK ID 215 Terser tIn = new Terser(in); 216 String ackID = tIn.get("/MSA-2"); 217 if (ID.equals(ackID)) { 218 System.out.println("OK - ack ID matches"); 219 } else { 220 throw new RuntimeException( 221 "Ack ID for message " + ID + " is " 222 + ackID); 223 } 224 225 } catch (Exception e) { 226 e.printStackTrace(); 227 } 228 } 229 }); 230 sender.start(); 231 } 232 233 } catch (Exception e) { 234 e.printStackTrace(); 235 } 236 } 237 238}