View Javadoc
1   /**
2    * The contents of this file are subject to the Mozilla Public License Version 1.1
3    * (the "License"); you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at http://www.mozilla.org/MPL/
5    * Software distributed under the License is distributed on an "AS IS" basis,
6    * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the
7    * specific language governing rights and limitations under the License.
8    *
9    * The Original Code is ""  Description:
10   * ""
11   *
12   * The Initial Developer of the Original Code is University Health Network. Copyright (C)
13   * 2001.  All Rights Reserved.
14   *
15   * Contributor(s): ______________________________________.
16   *
17   * Alternatively, the contents of this file may be used under the terms of the
18   * GNU General Public License (the  "GPL"), in which case the provisions of the GPL are
19   * applicable instead of those above.  If you wish to allow use of your version of this
20   * file only under the terms of the GPL and not to allow others to use your version
21   * of this file under the MPL, indicate your decision by deleting  the provisions above
22   * and replace  them with the notice and other provisions required by the GPL License.
23   * If you do not delete the provisions above, a recipient may use your version of
24   * this file under either the MPL or the GPL.
25   */
26  package ca.uhn.hl7v2.testpanel.model.conn;
27  
28  import java.awt.EventQueue;
29  import java.io.IOException;
30  import java.io.StringReader;
31  import java.io.StringWriter;
32  import java.util.Date;
33  import java.util.LinkedList;
34  import java.util.concurrent.CountDownLatch;
35  import java.util.concurrent.TimeUnit;
36  
37  import jakarta.xml.bind.JAXB;
38  import jakarta.xml.bind.annotation.XmlAccessType;
39  import jakarta.xml.bind.annotation.XmlAccessorType;
40  import jakarta.xml.bind.annotation.XmlType;
41  
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import ca.uhn.hl7v2.HL7Exception;
46  import ca.uhn.hl7v2.app.Connection;
47  import ca.uhn.hl7v2.app.ConnectionHub;
48  import ca.uhn.hl7v2.llp.LLPException;
49  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
50  import ca.uhn.hl7v2.model.Message;
51  import ca.uhn.hl7v2.parser.EncodingCharacters;
52  import ca.uhn.hl7v2.parser.Parser;
53  import ca.uhn.hl7v2.testpanel.model.ActivityIncomingMessage;
54  import ca.uhn.hl7v2.testpanel.model.ActivityInfo;
55  import ca.uhn.hl7v2.testpanel.model.ActivityInfoError;
56  import ca.uhn.hl7v2.testpanel.model.ActivityOutgoingMessage;
57  import ca.uhn.hl7v2.testpanel.model.msg.AbstractMessage;
58  import ca.uhn.hl7v2.testpanel.model.msg.Hl7V2MessageBase;
59  import ca.uhn.hl7v2.testpanel.model.msg.Hl7V2MessageCollection;
60  import ca.uhn.hl7v2.testpanel.ui.IDestroyable;
61  import ca.uhn.hl7v2.testpanel.util.IProgressCallback.OperationCancelRequestedException;
62  import ca.uhn.hl7v2.testpanel.util.ISendProgressCallback;
63  import ca.uhn.hl7v2.util.SocketFactory;
64  
65  @XmlAccessorType(XmlAccessType.FIELD)
66  @XmlType(name = "OutboundConnection")
67  public class OutboundConnection extends AbstractConnection implements IDestroyable {
68  
69  	private static final Logger ourLog = LoggerFactory.getLogger(OutboundConnection.class);
70  
71  	private transient Connection myConnection;
72  	private transient ConnectionMonitorThread myConnectionMonitorThread;
73  	private transient Parser myParser;
74  	private transient MessageSenderThread myMessageSenderThread;
75  
76  
77  	public OutboundConnection() {
78  	}
79  
80  
81  	@Override
82  	public String exportConfigToXml() {
83  		StringWriter writer = new StringWriter();
84  		JAXB.marshal(this, writer);
85  		return writer.toString();
86  	}
87  
88  
89  	public static OutboundConnection fromXml(String theXml) {
90  		return JAXB.unmarshal(new StringReader(theXml), OutboundConnection.class);
91  	}
92  
93  
94  	public synchronized void sendMessages(Hl7V2MessageCollection theMessages, ISendProgressCallback theTransmissionCallback) {
95  		if (myMessageSenderThread != null) {
96  			throw new IllegalStateException("Already sending messages");
97  		}
98  		myMessageSenderThread = new MessageSenderThread(theMessages, theTransmissionCallback);
99  		myMessageSenderThread.start();
100 	}
101 
102 
103 	public void start() {
104 		super.start();
105 
106 		if (myConnectionMonitorThread != null) {
107 			return;
108 		}
109 
110 		myParser = createParser();
111 
112 		ourLog.info("Starting up outgoing interface {}", getName());
113 
114 		myConnectionMonitorThread = new ConnectionMonitorThread();
115 		setStatus(StatusEnum.TRYING_TO_START);
116 		myConnectionMonitorThread.start();
117 	}
118 
119 
120 	public void stop() {
121 		super.stop();
122 
123 		ourLog.info("Shutting down outgoing interface {}", getName());
124 
125 		ConnectionMonitorThread thread = myConnectionMonitorThread;
126 		myConnectionMonitorThread = null;
127 
128 		if (thread != null) {
129 			thread.interrupt();
130 		}
131 
132 	}
133 
134 	private class ConnectionMonitorThread extends Thread {
135 
136 		private CountDownLatch myStartupLatch = new CountDownLatch(1);
137 
138 
139 		private void doRun() throws LLPException {
140 			LowerLayerProtocol llpClass = null;
141 			llpClass = createLlp();
142 
143 			boolean tls = isTls();
144 
145 			String desc = OutboundConnection.this.isNameIsExplicitlySet() ? OutboundConnection.this.getName() + " (" + OutboundConnection.this.createDescription() + ")" : OutboundConnection.this.createDescription();
146 			ourLog.info("Starting outbound interface " + desc);
147 
148 			Connection connection = null;
149 			while (myConnectionMonitorThread == this) {
150 
151 				if (getStatus() == StatusEnum.TRYING_TO_START) {
152 					String msg = "Attempting outbound connection to " + desc;
153 					ourLog.info(msg);
154 					addActivityInfoInSwingThread(msg);
155 
156 					try {
157 
158 						SocketFactory socketFactory = getSocketFactory();
159 
160 						if (isDualPort()) {
161 							connection = ConnectionHub.getInstance().attach(getHost(), getOutgoingPort(), getIncomingOrSinglePort(), myParser, llpClass, tls, socketFactory);
162 						} else {
163 							connection = ConnectionHub.getInstance().attach(getHost(), getIncomingOrSinglePort(), myParser, llpClass, tls, socketFactory);
164 						}
165 
166 						myConnection = connection;
167 
168 						msg = "Successfully connected to " + createDescription();
169 						ourLog.info(msg);
170 						addActivityInfoInSwingThread(msg);
171 
172 						myStartupLatch.countDown();
173 
174 					} catch (HL7Exception e) {
175 						Throwable ex = e;
176 						if (e.getCause() != null) {
177 							ex = e.getCause();
178 						}
179 
180 						ourLog.warn("Failed to connect to " + createDescription() + " - Message was " + ex.getMessage());
181 						addActivity(new ActivityInfoError(new Date(), "Failed to connect to " + createDescription() + " - " + ex.getMessage()));
182 					}
183 
184 					if (myConnection != null) {
185 						if (myConnection.isOpen()) {
186 							setStatus(StatusEnum.STARTED);
187 							setStatusLine("Connected");
188 						} else {
189 							setStatus(StatusEnum.TRYING_TO_START);
190 							setStatusLine("Lost connection, retrying...");
191 						}
192 					}
193 					if (myConnection == null) {
194 						setStatus(StatusEnum.TRYING_TO_START);
195 						setStatusLine("Trying to connect...");
196 					}
197 
198 					try {
199 						Thread.sleep(1000);
200 					} catch (InterruptedException e) {
201 						// ignore
202 					}
203 
204 				}
205 
206 				try {
207 					Thread.sleep(250);
208 				} catch (InterruptedException e) {
209 					// ignore
210 				}
211 
212 			}
213 
214 		}
215 
216 
217 		/**
218 		 * {@inheritDoc}
219 		 */
220 		@Override
221 		public void run() {
222 			setStatusLine("Trying to connect...");
223 
224 			try {
225 
226 				doRun();
227 				setStatusLine("Connection stopped");
228 
229 			} catch (Throwable e) {
230 				ourLog.error("Connection failed with an unexpected error!", e);
231 				setStatusLine("Failed with an error: " + e.getMessage());
232 			} finally {
233 				if (myConnection != null) {
234 					ConnectionHub.getInstance().discard(myConnection);
235 				}
236 				myConnectionMonitorThread = null;
237 				setStatus(StatusEnum.STOPPED);
238 			}
239 		}
240 
241 
242 		/**
243 		 * @return Returns true if we got a connection
244 		 */
245 		public boolean waitUntilWeHaveAConnection() {
246 			try {
247 				return myStartupLatch.await(5000, TimeUnit.MILLISECONDS);
248 			} catch (InterruptedException e) {
249 				return false;
250 			}
251 		}
252 
253 	}
254 
255 	public class MessageSenderThread extends Thread {
256 
257 		private Hl7V2MessageCollection myMessages;
258 		private ISendProgressCallback myTransmissionCallback;
259 		private boolean myCancelled;
260 		private long myStartTime;
261 		private StatusEnum myInitialStatus;
262 		private int myTotalMessages;
263 		private int mySentMessages;
264 		private LinkedList<Integer> myResponseTimes = new LinkedList<Integer>();
265 		private long myLastUpdate = 0;
266 
267 
268 		public MessageSenderThread(Hl7V2MessageCollection theMessages, ISendProgressCallback theTransmissionCallback) {
269 			myMessages = theMessages;
270 			myTransmissionCallback = theTransmissionCallback;
271 		}
272 
273 
274 		@Override
275 		public void run() {
276 
277 			EventQueue.invokeLater(new Runnable() {
278 				public void run() {
279 					myTransmissionCallback.activityStarted();
280 				}
281 			});
282 
283 			try {
284 
285 				if (!doStart()) {
286 					return;
287 				}
288 
289 				int sendNumberOfTimes = myMessages.getSendNumberOfTimes();
290 
291 				myTotalMessages = myMessages.countMessagesOfType(Hl7V2MessageBase.class) * sendNumberOfTimes;
292 				mySentMessages = 0;
293 
294 				myStartTime = System.currentTimeMillis();
295 				for (int curRep = 1; curRep <= sendNumberOfTimes; curRep++) {
296 					doSend();
297 				}
298 
299 				sendUpdate(1.0);
300 				
301 				doStop();
302 
303 			} finally {
304 				EventQueue.invokeLater(new Runnable() {
305 					public void run() {
306 						myTransmissionCallback.activityStopped();
307 					}
308 				});
309 				synchronized (OutboundConnection.this) {
310 					myMessageSenderThread = null;
311 				}
312 
313 				ourLog.info("Transmission thread shutting down");
314 			}
315 
316 		}
317 
318 
319 		private void doStop() {
320 			long delay = System.currentTimeMillis() - myStartTime;
321 			int i = myTotalMessages;
322 
323 			StringBuilder b = new StringBuilder();
324 			b.append("Sent ");
325 			b.append(i);
326 			b.append(" message");
327 			b.append((i != 1 ? "s" : ""));
328 			b.append(" in ");
329 			b.append(delay);
330 			b.append("ms");
331 			if (mySentMessages > 0) {
332 				b.append("<br/>");
333 				b.append("Average: ");
334 				b.append(delay / mySentMessages);
335 				b.append("ms / message");
336 			}
337 			
338 			addActivity(new ActivityInfo(new Date(), b.toString()));
339 
340 			if (myInitialStatus != StatusEnum.STARTED) {
341 				OutboundConnection.this.stop();
342 			}
343 		}
344 
345 
346 		private boolean doStart() {
347 			myInitialStatus = getStatus();
348 			if (myInitialStatus != StatusEnum.STARTED) {
349 				OutboundConnection.this.start();
350 			}
351 
352 			if (myConnectionMonitorThread == null) {
353 				throw new IllegalStateException("Interface not started");
354 			}
355 
356 			if (getStatus() != StatusEnum.STARTED) {
357 				ourLog.info("Waiting for interface {} to start...", OutboundConnection.this.getName());
358 				addActivity(new ActivityInfo(new Date(), "Starting interface \"" + OutboundConnection.this.getName() + "\"..."));
359 
360 				boolean gotConnection = myConnectionMonitorThread.waitUntilWeHaveAConnection();
361 				if (!gotConnection) {
362 					ourLog.info("Failed to connect to {}, shutting down interface and aborting send", createDescription());
363 					addActivity(new ActivityInfoError(new Date(), "Failed to connect to interface. Aborting send."));
364 					OutboundConnection.this.stop();
365 					return false;
366 				}
367 			}
368 
369 			return true;
370 		}
371 
372 
373 		private void doSend() {
374 			int i = 0;
375 			for (AbstractMessage<?> abstractMessage : myMessages.getMessages()) {
376 				if (myCancelled) {
377 					return;
378 				}
379 
380 				final double complete = (((double) mySentMessages) / myTotalMessages);
381 				long now = System.currentTimeMillis();
382 				long elapsedSinceLastUpdate = now - myLastUpdate;
383 				if (elapsedSinceLastUpdate > 1000) {
384 					sendUpdate(complete);
385 					myLastUpdate = now;
386 				}
387 
388 				i++;
389 
390 				if (abstractMessage instanceof Hl7V2MessageBase) {
391 
392 					Message msg = ((Hl7V2MessageBase) abstractMessage).getParsedMessage();
393 					ourLog.info("Sending message " + i + "/" + myTotalMessages + " of type " + msg.getClass());
394 					try {
395 
396 						beforeProcessingNewMessageOut();
397 						addActivity(new ActivityOutgoingMessage(new Date(), getEncoding(), myParser.encode(msg), EncodingCharacters.getInstance(msg)));
398 
399 						long beforeSend = now;
400 						Message response = myConnection.getInitiator().sendAndReceive(msg);
401 
402 						mySentMessages++;
403 
404 						long sendTime = now - beforeSend;
405 						myResponseTimes.add((int) sendTime);
406 						while (myResponseTimes.size() > 100) {
407 							myResponseTimes.pop();
408 						}
409 
410 						beforeProcessingNewMessageIn();
411 						addActivity(new ActivityIncomingMessage(new Date(), getEncoding(), myParser.encode(response), EncodingCharacters.getInstance(response)));
412 
413 					} catch (HL7Exception e) {
414 						ourLog.error("Failed to transmit message. ", e);
415 						addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. " + e.getMessage()));
416 					} catch (LLPException e) {
417 						ourLog.error("Failed to transmit message. ", e);
418 						addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. " + e.getMessage()));
419 					} catch (IOException e) {
420 						ourLog.error("Failed to transmit message. Shutting down interface. ", e);
421 						addActivity(new ActivityInfoError(new Date(), "Failed to transmit message. Shutting down interface. " + e.getMessage()));
422 						OutboundConnection.this.stop();
423 					}
424 
425 				} else {
426 
427 					ourLog.info("Skipping unknown message");
428 
429 				}
430 
431 			}
432 
433 		}
434 
435 
436 		private void sendUpdate(final double complete) {
437 			long now = System.currentTimeMillis();
438 	        long elapsed = now - myStartTime;
439 	        if (elapsed == 0) {
440 	        	elapsed = 1;
441 	        }
442 	        long throughputPerSecond = (mySentMessages * 1000) / elapsed;
443 	        long avgSendTime = 0;
444 	        if (myResponseTimes.size() > 0) {
445 	        	long total = 0;
446 	        	for (Integer next : myResponseTimes) {
447 	        		total += next;
448 	        	}
449 	        	avgSendTime = total / myResponseTimes.size();
450 	        }
451 	        final int avgSendTimeF = (int) avgSendTime;
452 	        final int throughputPerSecondF = (int) throughputPerSecond;
453 	        EventQueue.invokeLater(new Runnable() {
454 	        	public void run() {
455 	        		myTransmissionCallback.updateAvgResponseTimeMillis(avgSendTimeF);
456 	        		myTransmissionCallback.updateAvgThroughputPerSecond(throughputPerSecondF);
457 	        	}
458 	        });
459 	        
460 	        EventQueue.invokeLater(new Runnable() {
461 	        	public void run() {
462 	        		try {
463 	        			myTransmissionCallback.progressUpdate(complete);
464 	        		} catch (OperationCancelRequestedException e) {
465 	        			ourLog.info("Detected that transmission cancel was requested");
466 	        			myCancelled = true;
467 	        		}
468 	        	}
469 	        });
470         }
471 
472 	}
473 
474 }