Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
ActiveInitiator |
|
| 4.666666666666667;4.667 | ||||
ActiveInitiator$1 |
|
| 4.666666666666667;4.667 |
1 | /** | |
2 | The contents of this file are subject to the Mozilla Public License Version 1.1 | |
3 | (the "License"); you may not use this file except in compliance with the License. | |
4 | You may obtain a copy of the License at http://www.mozilla.org/MPL/ | |
5 | Software distributed under the License is distributed on an "AS IS" basis, | |
6 | WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the | |
7 | specific language governing rights and limitations under the License. | |
8 | ||
9 | The Original Code is "ActiveInitiator.java". Description: | |
10 | "Performs the initiation role of a message exchange accorging to HL7's original | |
11 | mode rules." | |
12 | ||
13 | The Initial Developer of the Original Code is University Health Network. Copyright (C) | |
14 | 2002. All Rights Reserved. | |
15 | ||
16 | Contributor(s): ______________________________________. | |
17 | ||
18 | Alternatively, the contents of this file may be used under the terms of the | |
19 | GNU General Public License (the �GPL�), in which case the provisions of the GPL are | |
20 | applicable instead of those above. If you wish to allow use of your version of this | |
21 | file only under the terms of the GPL and not to allow others to use your version | |
22 | of this file under the MPL, indicate your decision by deleting the provisions above | |
23 | and replace them with the notice and other provisions required by the GPL License. | |
24 | If you do not delete the provisions above, a recipient may use your version of | |
25 | this file under either the MPL or the GPL. | |
26 | ||
27 | */ | |
28 | ||
29 | package ca.uhn.hl7v2.app; | |
30 | ||
31 | import java.io.IOException; | |
32 | import java.net.Socket; | |
33 | import java.util.concurrent.ExecutionException; | |
34 | import java.util.concurrent.Future; | |
35 | import java.util.concurrent.TimeUnit; | |
36 | ||
37 | import ca.uhn.hl7v2.ErrorCode; | |
38 | import ca.uhn.hl7v2.HL7Exception; | |
39 | import ca.uhn.hl7v2.llp.LLPException; | |
40 | import ca.uhn.hl7v2.llp.LowerLayerProtocol; | |
41 | import ca.uhn.hl7v2.llp.MinLowerLayerProtocol; | |
42 | import ca.uhn.hl7v2.model.Message; | |
43 | import ca.uhn.hl7v2.parser.Parser; | |
44 | import ca.uhn.hl7v2.parser.PipeParser; | |
45 | import ca.uhn.hl7v2.util.Terser; | |
46 | import ca.uhn.hl7v2.util.idgenerator.IDGenerator; | |
47 | import ca.uhn.hl7v2.util.idgenerator.InMemoryIDGenerator; | |
48 | import org.slf4j.Logger; | |
49 | import org.slf4j.LoggerFactory; | |
50 | ||
51 | /** | |
52 | * <p> | |
53 | * Performs the initiation role of a message exchange (i.e sender of the first | |
54 | * message; analogous to the client in a client-server interaction), according | |
55 | * to HL7's original mode processing rules. | |
56 | * </p> | |
57 | * <p> | |
58 | * The <code>sendAndReceive(...)</code> method blocks until either a response is | |
59 | * received with the matching message ID, or until a timeout period has passed. | |
60 | * The timeout defaults to 10000 ms (10 sec) but can be configured using | |
61 | * {@link #setTimeout(long, java.util.concurrent.TimeUnit)} or globally by setting | |
62 | * the system property "ca.uhn.hl7v2.app.initiator.timeout" to an long value | |
63 | * representing the number of ms after which to time out. | |
64 | * </p> | |
65 | * <p> | |
66 | * At the time of writing, enhanced mode, two-phase reply, continuation | |
67 | * messages, and batch processing are unsupported. | |
68 | * </p> | |
69 | * | |
70 | * @author Bryan Tripp | |
71 | */ | |
72 | public class ActiveInitiator implements Initiator { | |
73 | ||
74 | 5 | private static final Logger log = LoggerFactory.getLogger(ActiveInitiator.class); |
75 | 5 | private static final Logger rawOutbound = LoggerFactory |
76 | 5 | .getLogger("ca.uhn.hl7v2.raw.outbound"); |
77 | 10 | private static final Logger rawInbound = LoggerFactory |
78 | 5 | .getLogger("ca.uhn.hl7v2.raw.inbound"); |
79 | private ActiveConnection conn; | |
80 | 439 | private long timeoutMillis = 10000; |
81 | ||
82 | /** | |
83 | * Creates a new instance of ActiveInitiator. | |
84 | * | |
85 | * @param conn | |
86 | * the Connection associated with this ActiveInitiator. | |
87 | */ | |
88 | 439 | ActiveInitiator(ActiveConnection conn) throws LLPException { |
89 | 439 | this.conn = conn; |
90 | ||
91 | // See if timeout has been set | |
92 | 439 | String timeout = System |
93 | 439 | .getProperty("ca.uhn.hl7v2.app.initiator.timeout"); |
94 | 439 | if (timeout != null) { |
95 | try { | |
96 | 0 | timeoutMillis = Long.parseLong(timeout); |
97 | 0 | log.debug("Setting Initiator timeout to {} ms", timeout); |
98 | 0 | } catch (NumberFormatException e) { |
99 | 0 | log.warn(timeout |
100 | + " is not a valid long - Initiator is using default timeout"); | |
101 | 0 | } |
102 | } | |
103 | 439 | } |
104 | ||
105 | /** | |
106 | * Sends a message to a responder system, receives the reply, and returns | |
107 | * the reply as a Message object. This method is thread-safe - multiple | |
108 | * threads can share an Initiator and call this method. Responses are | |
109 | * returned to the calling thread on the basis of message ID. | |
110 | */ | |
111 | public Message sendAndReceive(Message out) throws HL7Exception, | |
112 | LLPException, IOException { | |
113 | 588 | if (out == null) { |
114 | 0 | throw new HL7Exception("Can't encode null message", |
115 | ErrorCode.REQUIRED_FIELD_MISSING); | |
116 | } | |
117 | ||
118 | // register message with response Receiver(s) (by message ID) | |
119 | 588 | Terser t = new Terser(out); |
120 | 588 | String messID = t.get("/MSH-10"); |
121 | ||
122 | 588 | if (messID == null || messID.length() == 0) { |
123 | 0 | throw new HL7Exception( |
124 | "MSH segment missing required field Control ID (MSH-10)", | |
125 | ErrorCode.REQUIRED_FIELD_MISSING); | |
126 | } | |
127 | ||
128 | // log and send message | |
129 | 588 | String outbound = conn.getParser().encode(out); |
130 | 588 | rawOutbound.debug(outbound); |
131 | 588 | Future<String> inbound = null; |
132 | try { | |
133 | String message; | |
134 | 588 | inbound = conn.waitForResponse(messID, timeoutMillis); |
135 | 588 | conn.getSendWriter().writeMessage(outbound); |
136 | 588 | if (inbound != null && (message = inbound.get()) != null) { |
137 | // log that we got the message | |
138 | 578 | log.debug("Initiator received message: {}", message); |
139 | 578 | rawInbound.debug(message); |
140 | 578 | Message response = conn.getParser().parse(message); |
141 | 578 | log.debug("response parsed"); |
142 | 578 | return response; |
143 | } | |
144 | 0 | } catch (IOException e) { |
145 | 0 | if (inbound != null) |
146 | 0 | inbound.cancel(true); |
147 | 0 | conn.close(); |
148 | 0 | throw e; |
149 | 0 | } catch (InterruptedException e) { |
150 | 0 | } catch (ExecutionException e) { |
151 | 10 | } |
152 | ||
153 | 10 | throw new TimeoutException("Timeout waiting for response to message with control ID " |
154 | + messID + " after " + timeoutMillis + " ms."); | |
155 | } | |
156 | ||
157 | /** | |
158 | * Sets the timeout to wait for a response from the server | |
159 | * | |
160 | * @param timeout time in milliseconds | |
161 | */ | |
162 | public void setTimeoutMillis(int timeout) { | |
163 | 5 | setTimeout(timeout, TimeUnit.MILLISECONDS); |
164 | 5 | } |
165 | ||
166 | /** | |
167 | * | |
168 | * Sets the timeout to wait for a response from the server | |
169 | * | |
170 | * @param timeout time duration | |
171 | * @param timeUnit time unit | |
172 | */ | |
173 | public void setTimeout(long timeout, TimeUnit timeUnit) { | |
174 | 25 | this.timeoutMillis = timeUnit.toMillis(timeout); |
175 | 25 | } |
176 | ||
177 | /** | |
178 | * Test harness | |
179 | */ | |
180 | public static void main(String args[]) { | |
181 | 0 | if (args.length != 2) { |
182 | 0 | System.out.println("Usage: ca.uhn.hl7v2.app.ActiveInitiator host port"); |
183 | } | |
184 | ||
185 | try { | |
186 | ||
187 | // set up connection to server | |
188 | 0 | String host = args[0]; |
189 | 0 | int port = Integer.parseInt(args[1]); |
190 | ||
191 | 0 | final Parser parser = new PipeParser(); |
192 | 0 | LowerLayerProtocol llp = new MinLowerLayerProtocol(); |
193 | 0 | Connection connection = new ActiveConnection(parser, llp, new Socket( |
194 | host, port)); | |
195 | 0 | final Initiator initiator = connection.getInitiator(); |
196 | 0 | connection.activate(); |
197 | 0 | final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA"; |
198 | 0 | final IDGenerator generator = new InMemoryIDGenerator(); |
199 | ||
200 | // get a bunch of threads to send messages | |
201 | 0 | for (int i = 0; i < 1000; i++) { |
202 | 0 | Thread sender = new Thread(new Runnable() { |
203 | ||
204 | public void run() { | |
205 | try { | |
206 | // get message ID | |
207 | 0 | String ID = generator.getID(); |
208 | 0 | Message out = parser.parse(outText); |
209 | 0 | Terser tOut = new Terser(out); |
210 | 0 | tOut.set("/MSH-10", ID); |
211 | ||
212 | // send, get response | |
213 | 0 | Message in = initiator.sendAndReceive(out); |
214 | // get ACK ID | |
215 | 0 | Terser tIn = new Terser(in); |
216 | 0 | String ackID = tIn.get("/MSA-2"); |
217 | 0 | if (ID.equals(ackID)) { |
218 | 0 | System.out.println("OK - ack ID matches"); |
219 | } else { | |
220 | 0 | throw new RuntimeException( |
221 | "Ack ID for message " + ID + " is " | |
222 | + ackID); | |
223 | } | |
224 | ||
225 | 0 | } catch (Exception e) { |
226 | 0 | e.printStackTrace(); |
227 | 0 | } |
228 | 0 | } |
229 | }); | |
230 | 0 | sender.start(); |
231 | } | |
232 | ||
233 | 0 | } catch (Exception e) { |
234 | 0 | e.printStackTrace(); |
235 | 0 | } |
236 | 0 | } |
237 | ||
238 | } |