1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package ca.uhn.hl7v2.testpanel.model.conn;
27
28 import java.awt.EventQueue;
29 import java.io.IOException;
30 import java.io.StringReader;
31 import java.io.StringWriter;
32 import java.util.Date;
33 import java.util.LinkedList;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36
37 import jakarta.xml.bind.JAXB;
38 import jakarta.xml.bind.annotation.XmlAccessType;
39 import jakarta.xml.bind.annotation.XmlAccessorType;
40 import jakarta.xml.bind.annotation.XmlType;
41
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import ca.uhn.hl7v2.HL7Exception;
46 import ca.uhn.hl7v2.app.Connection;
47 import ca.uhn.hl7v2.app.ConnectionHub;
48 import ca.uhn.hl7v2.llp.LLPException;
49 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
50 import ca.uhn.hl7v2.model.Message;
51 import ca.uhn.hl7v2.parser.EncodingCharacters;
52 import ca.uhn.hl7v2.parser.Parser;
53 import ca.uhn.hl7v2.testpanel.model.ActivityIncomingMessage;
54 import ca.uhn.hl7v2.testpanel.model.ActivityInfo;
55 import ca.uhn.hl7v2.testpanel.model.ActivityInfoError;
56 import ca.uhn.hl7v2.testpanel.model.ActivityOutgoingMessage;
57 import ca.uhn.hl7v2.testpanel.model.msg.AbstractMessage;
58 import ca.uhn.hl7v2.testpanel.model.msg.Hl7V2MessageBase;
59 import ca.uhn.hl7v2.testpanel.model.msg.Hl7V2MessageCollection;
60 import ca.uhn.hl7v2.testpanel.ui.IDestroyable;
61 import ca.uhn.hl7v2.testpanel.util.IProgressCallback.OperationCancelRequestedException;
62 import ca.uhn.hl7v2.testpanel.util.ISendProgressCallback;
63 import ca.uhn.hl7v2.util.SocketFactory;
64
65 @XmlAccessorType(XmlAccessType.FIELD)
66 @XmlType(name = "OutboundConnection")
67 public class OutboundConnection extends AbstractConnection implements IDestroyable {
68
69 private static final Logger ourLog = LoggerFactory.getLogger(OutboundConnection.class);
70
71 private transient Connection myConnection;
72 private transient ConnectionMonitorThread myConnectionMonitorThread;
73 private transient Parser myParser;
74 private transient MessageSenderThread myMessageSenderThread;
75
76
77 public OutboundConnection() {
78 }
79
80
81 @Override
82 public String exportConfigToXml() {
83 StringWriter writer = new StringWriter();
84 JAXB.marshal(this, writer);
85 return writer.toString();
86 }
87
88
89 public static OutboundConnection fromXml(String theXml) {
90 return JAXB.unmarshal(new StringReader(theXml), OutboundConnection.class);
91 }
92
93
94 public synchronized void sendMessages(Hl7V2MessageCollection theMessages, ISendProgressCallback theTransmissionCallback) {
95 if (myMessageSenderThread != null) {
96 throw new IllegalStateException("Already sending messages");
97 }
98 myMessageSenderThread = new MessageSenderThread(theMessages, theTransmissionCallback);
99 myMessageSenderThread.start();
100 }
101
102
103 public void start() {
104 super.start();
105
106 if (myConnectionMonitorThread != null) {
107 return;
108 }
109
110 myParser = createParser();
111
112 ourLog.info("Starting up outgoing interface {}", getName());
113
114 myConnectionMonitorThread = new ConnectionMonitorThread();
115 setStatus(StatusEnum.TRYING_TO_START);
116 myConnectionMonitorThread.start();
117 }
118
119
120 public void stop() {
121 super.stop();
122
123 ourLog.info("Shutting down outgoing interface {}", getName());
124
125 ConnectionMonitorThread thread = myConnectionMonitorThread;
126 myConnectionMonitorThread = null;
127
128 if (thread != null) {
129 thread.interrupt();
130 }
131
132 }
133
134 private class ConnectionMonitorThread extends Thread {
135
136 private CountDownLatch myStartupLatch = new CountDownLatch(1);
137
138
139 private void doRun() throws LLPException {
140 LowerLayerProtocol llpClass = null;
141 llpClass = createLlp();
142
143 boolean tls = isTls();
144
145 String desc = OutboundConnection.this.isNameIsExplicitlySet() ? OutboundConnection.this.getName() + " (" + OutboundConnection.this.createDescription() + ")" : OutboundConnection.this.createDescription();
146 ourLog.info("Starting outbound interface " + desc);
147
148 Connection connection = null;
149 while (myConnectionMonitorThread == this) {
150
151 if (getStatus() == StatusEnum.TRYING_TO_START) {
152 String msg = "Attempting outbound connection to " + desc;
153 ourLog.info(msg);
154 addActivityInfoInSwingThread(msg);
155
156 try {
157
158 SocketFactory socketFactory = getSocketFactory();
159
160 if (isDualPort()) {
161 connection = ConnectionHub.getInstance().attach(getHost(), getOutgoingPort(), getIncomingOrSinglePort(), myParser, llpClass, tls, socketFactory);
162 } else {
163 connection = ConnectionHub.getInstance().attach(getHost(), getIncomingOrSinglePort(), myParser, llpClass, tls, socketFactory);
164 }
165
166 myConnection = connection;
167
168 msg = "Successfully connected to " + createDescription();
169 ourLog.info(msg);
170 addActivityInfoInSwingThread(msg);
171
172 myStartupLatch.countDown();
173
174 } catch (HL7Exception e) {
175 Throwable ex = e;
176 if (e.getCause() != null) {
177 ex = e.getCause();
178 }
179
180 ourLog.warn("Failed to connect to " + createDescription() + " - Message was " + ex.getMessage());
181 addActivity(new ActivityInfoError(new Date(), "Failed to connect to " + createDescription() + " - " + ex.getMessage()));
182 }
183
184 if (myConnection != null) {
185 if (myConnection.isOpen()) {
186 setStatus(StatusEnum.STARTED);
187 setStatusLine("Connected");
188 } else {
189 setStatus(StatusEnum.TRYING_TO_START);
190 setStatusLine("Lost connection, retrying...");
191 }
192 }
193 if (myConnection == null) {
194 setStatus(StatusEnum.TRYING_TO_START);
195 setStatusLine("Trying to connect...");
196 }
197
198 try {
199 Thread.sleep(1000);
200 } catch (InterruptedException e) {
201
202 }
203
204 }
205
206 try {
207 Thread.sleep(250);
208 } catch (InterruptedException e) {
209
210 }
211
212 }
213
214 }
215
216
217
218
219
220 @Override
221 public void run() {
222 setStatusLine("Trying to connect...");
223
224 try {
225
226 doRun();
227 setStatusLine("Connection stopped");
228
229 } catch (Throwable e) {
230 ourLog.error("Connection failed with an unexpected error!", e);
231 setStatusLine("Failed with an error: " + e.getMessage());
232 } finally {
233 if (myConnection != null) {
234 ConnectionHub.getInstance().discard(myConnection);
235 }
236 myConnectionMonitorThread = null;
237 setStatus(StatusEnum.STOPPED);
238 }
239 }
240
241
242
243
244
245 public boolean waitUntilWeHaveAConnection() {
246 try {
247 return myStartupLatch.await(5000, TimeUnit.MILLISECONDS);
248 } catch (InterruptedException e) {
249 return false;
250 }
251 }
252
253 }
254
255 public class MessageSenderThread extends Thread {
256
257 private Hl7V2MessageCollection myMessages;
258 private ISendProgressCallback myTransmissionCallback;
259 private boolean myCancelled;
260 private long myStartTime;
261 private StatusEnum myInitialStatus;
262 private int myTotalMessages;
263 private int mySentMessages;
264 private LinkedList<Integer> myResponseTimes = new LinkedList<Integer>();
265 private long myLastUpdate = 0;
266
267
268 public MessageSenderThread(Hl7V2MessageCollection theMessages, ISendProgressCallback theTransmissionCallback) {
269 myMessages = theMessages;
270 myTransmissionCallback = theTransmissionCallback;
271 }
272
273
274 @Override
275 public void run() {
276
277 EventQueue.invokeLater(new Runnable() {
278 public void run() {
279 myTransmissionCallback.activityStarted();
280 }
281 });
282
283 try {
284
285 if (!doStart()) {
286 return;
287 }
288
289 int sendNumberOfTimes = myMessages.getSendNumberOfTimes();
290
291 myTotalMessages = myMessages.countMessagesOfType(Hl7V2MessageBase.class) * sendNumberOfTimes;
292 mySentMessages = 0;
293
294 myStartTime = System.currentTimeMillis();
295 for (int curRep = 1; curRep <= sendNumberOfTimes; curRep++) {
296 doSend();
297 }
298
299 sendUpdate(1.0);
300
301 doStop();
302
303 } finally {
304 EventQueue.invokeLater(new Runnable() {
305 public void run() {
306 myTransmissionCallback.activityStopped();
307 }
308 });
309 synchronized (OutboundConnection.this) {
310 myMessageSenderThread = null;
311 }
312
313 ourLog.info("Transmission thread shutting down");
314 }
315
316 }
317
318
319 private void doStop() {
320 long delay = System.currentTimeMillis() - myStartTime;
321 int i = myTotalMessages;
322
323 StringBuilder b = new StringBuilder();
324 b.append("Sent ");
325 b.append(i);
326 b.append(" message");
327 b.append((i != 1 ? "s" : ""));
328 b.append(" in ");
329 b.append(delay);
330 b.append("ms");
331 if (mySentMessages > 0) {
332 b.append("<br/>");
333 b.append("Average: ");
334 b.append(delay / mySentMessages);
335 b.append("ms / message");
336 }
337
338 addActivity(new ActivityInfo(new Date(), b.toString()));
339
340 if (myInitialStatus != StatusEnum.STARTED) {
341 OutboundConnection.this.stop();
342 }
343 }
344
345
346 private boolean doStart() {
347 myInitialStatus = getStatus();
348 if (myInitialStatus != StatusEnum.STARTED) {
349 OutboundConnection.this.start();
350 }
351
352 if (myConnectionMonitorThread == null) {
353 throw new IllegalStateException("Interface not started");
354 }
355
356 if (getStatus() != StatusEnum.STARTED) {
357 ourLog.info("Waiting for interface {} to start...", OutboundConnection.this.getName());
358 addActivity(new ActivityInfo(new Date(), "Starting interface \"" + OutboundConnection.this.getName() + "\"..."));
359
360 boolean gotConnection = myConnectionMonitorThread.waitUntilWeHaveAConnection();
361 if (!gotConnection) {
362 ourLog.info("Failed to connect to {}, shutting down interface and aborting send", createDescription());
363 addActivity(new ActivityInfoError(new Date(), "Failed to connect to interface. Aborting send."));
364 OutboundConnection.this.stop();
365 return false;
366 }
367 }
368
369 return true;
370 }
371
372
373 private void doSend() {
374 int i = 0;
375 for (AbstractMessage<?> abstractMessage : myMessages.getMessages()) {
376 if (myCancelled) {
377 return;
378 }
379
380 final double complete = (((double) mySentMessages) / myTotalMessages);
381 long now = System.currentTimeMillis();
382 long elapsedSinceLastUpdate = now - myLastUpdate;
383 if (elapsedSinceLastUpdate > 1000) {
384 sendUpdate(complete);
385 myLastUpdate = now;
386 }
387
388 i++;
389
390 if (abstractMessage instanceof Hl7V2MessageBase) {
391
392 Message msg = ((Hl7V2MessageBase) abstractMessage).getParsedMessage();
393 ourLog.info("Sending message " + i + "/" + myTotalMessages + " of type " + msg.getClass());
394 try {
395
396 beforeProcessingNewMessageOut();
397 addActivity(new ActivityOutgoingMessage(new Date(), getEncoding(), myParser.encode(msg), EncodingCharacters.getInstance(msg)));
398
399 long beforeSend = now;
400 Message response = myConnection.getInitiator().sendAndReceive(msg);
401
402 mySentMessages++;
403
404 long sendTime = now - beforeSend;
405 myResponseTimes.add((int) sendTime);
406 while (myResponseTimes.size() > 100) {
407 myResponseTimes.pop();
408 }
409
410 beforeProcessingNewMessageIn();
411 addActivity(new ActivityIncomingMessage(new Date(), getEncoding(), myParser.encode(response), EncodingCharacters.getInstance(response)));
412
413 } catch (HL7Exception e) {
414 ourLog.error("Failed to transmit message. ", e);
415 addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. " + e.getMessage()));
416 } catch (LLPException e) {
417 ourLog.error("Failed to transmit message. ", e);
418 addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. " + e.getMessage()));
419 } catch (IOException e) {
420 ourLog.error("Failed to transmit message. Shutting down interface. ", e);
421 addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. Shutting down interface. " + e.getMessage()));
422 OutboundConnection.this.stop();
423 }
424
425 } else {
426
427 ourLog.info("Skipping unknown message");
428
429 }
430
431 }
432
433 }
434
435
436 private void sendUpdate(final double complete) {
437 long now = System.currentTimeMillis();
438 long elapsed = now - myStartTime;
439 if (elapsed == 0) {
440 elapsed = 1;
441 }
442 long throughputPerSecond = (mySentMessages * 1000) / elapsed;
443 long avgSendTime = 0;
444 if (myResponseTimes.size() > 0) {
445 long total = 0;
446 for (Integer next : myResponseTimes) {
447 total += next;
448 }
449 avgSendTime = total / myResponseTimes.size();
450 }
451 final int avgSendTimeF = (int) avgSendTime;
452 final int throughputPerSecondF = (int) throughputPerSecond;
453 EventQueue.invokeLater(new Runnable() {
454 public void run() {
455 myTransmissionCallback.updateAvgResponseTimeMillis(avgSendTimeF);
456 myTransmissionCallback.updateAvgThroughputPerSecond(throughputPerSecondF);
457 }
458 });
459
460 EventQueue.invokeLater(new Runnable() {
461 public void run() {
462 try {
463 myTransmissionCallback.progressUpdate(complete);
464 } catch (OperationCancelRequestedException e) {
465 ourLog.info("Detected that transmission cancel was requested");
466 myCancelled = true;
467 }
468 }
469 });
470 }
471
472 }
473
474 }