1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package ca.uhn.hl7v2.app;
30
31 import java.io.IOException;
32 import java.net.Socket;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import ca.uhn.hl7v2.ErrorCode;
38 import ca.uhn.hl7v2.HL7Exception;
39 import ca.uhn.hl7v2.llp.LLPException;
40 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
41 import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
42 import ca.uhn.hl7v2.model.Message;
43 import ca.uhn.hl7v2.parser.Parser;
44 import ca.uhn.hl7v2.parser.PipeParser;
45 import ca.uhn.hl7v2.util.Terser;
46 import ca.uhn.hl7v2.util.idgenerator.IDGenerator;
47 import ca.uhn.hl7v2.util.idgenerator.InMemoryIDGenerator;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class ActiveInitiator implements Initiator {
73
74 private static final Logger log = LoggerFactory.getLogger(ActiveInitiator.class);
75 private static final Logger rawOutbound = LoggerFactory
76 .getLogger("ca.uhn.hl7v2.raw.outbound");
77 private static final Logger rawInbound = LoggerFactory
78 .getLogger("ca.uhn.hl7v2.raw.inbound");
79 private final ActiveConnection conn;
80 private long timeoutMillis = 10000;
81
82
83
84
85
86
87
88 ActiveInitiator(ActiveConnection conn) {
89 this.conn = conn;
90
91
92 String timeout = System
93 .getProperty("ca.uhn.hl7v2.app.initiator.timeout");
94 if (timeout != null) {
95 try {
96 timeoutMillis = Long.parseLong(timeout);
97 log.debug("Setting Initiator timeout to {} ms", timeout);
98 } catch (NumberFormatException e) {
99 log.warn(timeout
100 + " is not a valid long - Initiator is using default timeout");
101 }
102 }
103 }
104
105
106
107
108
109
110
111 public Message../../ca/uhn/hl7v2/model/Message.html#Message">Message sendAndReceive(Message out) throws HL7Exception,
112 LLPException, IOException {
113 if (out == null) {
114 throw new HL7Exception("Can't encode null message",
115 ErrorCode.REQUIRED_FIELD_MISSING);
116 }
117
118
119 Terser t = new Terser(out);
120 String messID = t.get("/MSH-10");
121
122 if (messID == null || messID.length() == 0) {
123 throw new HL7Exception(
124 "MSH segment missing required field Control ID (MSH-10)",
125 ErrorCode.REQUIRED_FIELD_MISSING);
126 }
127
128
129 String outbound = conn.getParser().encode(out);
130 rawOutbound.debug(outbound);
131 Future<String> inbound = null;
132 try {
133 String message;
134 inbound = conn.waitForResponse(messID, timeoutMillis);
135 conn.getSendWriter().writeMessage(outbound);
136 if (inbound != null && (message = inbound.get()) != null) {
137
138 log.debug("Initiator received message: {}", message);
139 rawInbound.debug(message);
140 Message response = conn.getParser().parse(message);
141 log.debug("response parsed");
142 return response;
143 }
144 } catch (IOException e) {
145 if (inbound != null)
146 inbound.cancel(true);
147 conn.close();
148 throw e;
149 } catch (InterruptedException | ExecutionException ignored) {
150 }
151
152 throw new TimeoutException("Timeout waiting for response to message with control ID "
153 + messID + " after " + timeoutMillis + " ms.");
154 }
155
156
157
158
159
160
161 public void setTimeoutMillis(int timeout) {
162 setTimeout(timeout, TimeUnit.MILLISECONDS);
163 }
164
165
166
167
168
169
170
171
172 public void setTimeout(long timeout, TimeUnit timeUnit) {
173 this.timeoutMillis = timeUnit.toMillis(timeout);
174 }
175
176
177
178
179 public static void main(String[] args) {
180 if (args.length != 2) {
181 System.out.println("Usage: ca.uhn.hl7v2.app.ActiveInitiator host port");
182 }
183
184 try {
185
186
187 String host = args[0];
188 int port = Integer.parseInt(args[1]);
189
190 final Parser parser = new PipeParser();
191 LowerLayerProtocol llp = new MinLowerLayerProtocol();
192 Connection connection = new ActiveConnection(parser, llp, new Socket(
193 host, port));
194 final Initiator initiator = connection.getInitiator();
195 connection.activate();
196 final String outText = "MSH|^~\\&|||||||ACK^^ACK|||R|2.4|\rMSA|AA";
197 final IDGenerator generator = new InMemoryIDGenerator();
198
199
200 for (int i = 0; i < 1000; i++) {
201 Thread sender = new Thread(() -> {
202 try {
203
204 String ID = generator.getID();
205 Message out = parser.parse(outText);
206 Terser tOut = new Terser(out);
207 tOut.set("/MSH-10", ID);
208
209
210 Message in = initiator.sendAndReceive(out);
211
212 Terser tIn = new Terser(in);
213 String ackID = tIn.get("/MSA-2");
214 if (ID.equals(ackID)) {
215 System.out.println("OK - ack ID matches");
216 } else {
217 throw new RuntimeException(
218 "Ack ID for message " + ID + " is "
219 + ackID);
220 }
221
222 } catch (Exception e) {
223 e.printStackTrace();
224 }
225 });
226 sender.start();
227 }
228
229 } catch (Exception e) {
230 e.printStackTrace();
231 }
232 }
233
234 }