View Javadoc
1   /**
2   The contents of this file are subject to the Mozilla Public License Version 1.1 
3   (the "License"); you may not use this file except in compliance with the License. 
4   You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
5   Software distributed under the License is distributed on an "AS IS" basis, 
6   WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
7   specific language governing rights and limitations under the License. 
8   
9   The Original Code is "ConnectionHub.java".  Description: 
10  "Provides access to shared HL7 Connections" 
11  
12  The Initial Developer of the Original Code is University Health Network. Copyright (C) 
13  2001.  All Rights Reserved. 
14  
15  Contributor(s): ______________________________________. 
16  
17  Alternatively, the contents of this file may be used under the terms of the 
18  GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
19  applicable instead of those above.  If you wish to allow use of your version of this 
20  file only under the terms of the GPL and not to allow others to use your version 
21  of this file under the MPL, indicate your decision by deleting  the provisions above 
22  and replace  them with the notice and other provisions required by the GPL License.  
23  If you do not delete the provisions above, a recipient may use your version of 
24  this file under either the MPL or the GPL. 
25   */
26  
27  package ca.uhn.hl7v2.app;
28  
29  import java.io.IOException;
30  import java.util.Collections;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ExecutorService;
36  
37  import ca.uhn.hl7v2.DefaultHapiContext;
38  import ca.uhn.hl7v2.HL7Exception;
39  import ca.uhn.hl7v2.HapiContext;
40  import ca.uhn.hl7v2.HapiContextSupport;
41  import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
42  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
43  import ca.uhn.hl7v2.parser.Parser;
44  import ca.uhn.hl7v2.util.ReflectionUtil;
45  import ca.uhn.hl7v2.util.SocketFactory;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  
49  /**
50   * <p>
51   * Provides access to shared HL7 Connections. The ConnectionHub has at most one connection to any
52   * given address at any time.
53   * </p>
54   * <p>
55   * <b>Synchronization Note:</b> This class should be safe to use in a multithreaded environment. A
56   * synchronization mutex is maintained for any given target host and port, so that if two threads
57   * are trying to connect to two separate destinations neither will block, but if two threads are
58   * trying to connect to the same destination, one will block until the other has finished trying.
59   * Use caution if this class is to be used in an environment where a very large (over 1000) number
60   * of target host/port destinations will be accessed at the same time.
61   * </p>
62   * 
63   * @author Bryan Tripp
64   */
65  public class ConnectionHub extends HapiContextSupport {
66  
67  	private static volatile ConnectionHub instance = null;
68  	private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
69  	/**
70  	 * Set a system property with this key to a string containing an integer larger than the default
71  	 * ("1000") if you need to connect to a very large number of targets at the same time in a
72  	 * multithreaded environment.
73  	 */
74  	public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
75  	private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<>();
76  	private final CountingMap<ConnectionData, Connection> connections;
77  
78  	/** Creates a new instance of ConnectionHub */
79  	private ConnectionHub(ExecutorService executorService) {
80  		this(new DefaultHapiContext(executorService));
81  	}
82  
83  	private ConnectionHub(HapiContext context) {
84  		super(context);
85  		connections = new CountingMap<ConnectionData, Connection>() {
86  
87  			@Override
88  			protected void dispose(Connection connection) {
89  				try {
90  					connection.close();
91  				} catch (IOException e) {
92  					throw new RuntimeException(e);
93  				}
94  			}
95  
96  			@Override
97  			protected Connection open(ConnectionData connectionData) throws Exception {
98  				return ConnectionFactory
99  						.open(connectionData, getHapiContext().getExecutorService());
100 			}
101 
102 		};
103 	}
104 
105 	public Set<? extends ConnectionData> allConnections() {
106 		return connections.keySet();
107 	}
108 
109 	/**
110 	 * @since 2.0
111 	 */
112 	public Connection attach(ConnectionData data) throws HL7Exception {
113 		try {
114 			Connection conn;
115 			// Disallow establishing same connection targets concurrently
116 			connectionMutexes.putIfAbsent(data.toString(), data.toString());
117 			String mutex = connectionMutexes.get(data.toString());
118 			synchronized (mutex) {
119 				discardConnectionIfStale(connections.get(data));
120 				// Create connection or increase counter
121 				conn = connections.put(data);
122 			}
123 			return conn;
124 		} catch (Exception e) {
125 			log.debug("Failed to attach", e);
126 			throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
127 					+ data.getPort() + "/" + data.getPort2(), e);
128 		}
129 	}
130 
131     /**
132      * Returns a Connection to the given address, opening this Connection if necessary. The given
133      * Parser will only be used if a new Connection is opened, so there is no guarantee that the
134      * Connection returned will be using the Parser you provide. If you need explicit access to the
135      * Parser the Connection is using, call <code>Connection.getParser()</code>.
136      *
137      * @since 2.1
138      */
139     public Connection attach(String host, int port, boolean tls) throws HL7Exception {
140         return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
141                 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
142                 .getSocketFactory(), false));
143     }
144 
145 	/**
146 	 * Returns a Connection to the given address, opening this Connection if necessary. The given
147 	 * Parser will only be used if a new Connection is opened, so there is no guarantee that the
148 	 * Connection returned will be using the Parser you provide. If you need explicit access to the
149 	 * Parser the Connection is using, call <code>Connection.getParser()</code>.
150 	 * 
151 	 * @since 2.2
152 	 */
153 	public Connection attachLazily(String host, int port, boolean tls) throws HL7Exception {
154 		return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
155 				getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
156 						.getSocketFactory(), true));
157 	}
158 
159     /**
160      * @since 2.0
161      */
162     public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
163         return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
164                 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
165                 getHapiContext().getSocketFactory(), false));
166     }
167 
168 	/**
169 	 * @since 2.2
170 	 */
171 	public Connection attachLazily(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
172 		return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
173 				.getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
174 				getHapiContext().getSocketFactory(), true));
175 	}
176 
177 	/**
178 	 * @since 2.0
179 	 */
180 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
181 			Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
182 		return attach(host, outboundPort, inboundPort, parser, llpClass, false);
183 	}
184 
185     /**
186      * @since 2.0
187      */
188     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
189                              Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
190         return attachLazily(host, outboundPort, inboundPort, parser, llpClass, false);
191     }
192 
193 	/**
194 	 * @since 2.0
195 	 */
196 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
197 		Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
198 		LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
199 		return attach(host, outboundPort, inboundPort, parser, llp, tls);
200 	}
201 
202     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
203                              Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
204         LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
205         return attachLazily(host, outboundPort, inboundPort, parser, llp, tls);
206     }
207 
208 	/**
209 	 * @since 2.0
210 	 */
211 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
212 			LowerLayerProtocol llp, boolean tls) throws HL7Exception {
213 		return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, false));
214 	}
215 
216     /**
217      * @since 2.2
218      */
219     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
220                              LowerLayerProtocol llp, boolean tls) throws HL7Exception {
221         return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, true));
222     }
223 
224 	/**
225 	 * @since 2.1
226 	 */
227 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
228                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
229 		return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, false));
230 	}
231 
232     /**
233      * @since 2.1
234      */
235     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
236                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
237         return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, true));
238     }
239 
240 	/**
241 	 * @since 2.1
242 	 */
243 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
244                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
245 		return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, false));
246 	}
247 
248     /**
249      * @since 2.1
250      */
251     public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
252                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
253         return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, true));
254     }
255 
256 	/**
257 	 * @since 2.1
258 	 */
259 	public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
260 		return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
261                 tls, hapiContext.getSocketFactory(), false));
262 	}
263 
264     /**
265      * @since 2.2
266      */
267     public Connection attachLazily(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
268         return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
269                 tls, hapiContext.getSocketFactory(), true));
270     }
271 
272 	/**
273 	 * @since 2.1
274 	 */
275 	public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
276 		return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
277                 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), false));
278 	}
279 
280     /**
281      * @since 2.2
282      */
283     public Connection attachLazily(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
284         return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
285                 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), true));
286     }
287 
288 	/**
289 	 * @since 1.2
290 	 */
291 	public Connection attach(String host, int port, Parser parser,
292 			Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
293 		return attach(host, port, parser, llpClass, false);
294 	}
295 
296 	/**
297 	 * @since 2.0
298 	 */
299 	public Connection attach(String host, int port, Parser parser,
300 			Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
301 		return attach(host, port, 0, parser, llpClass, tls);
302 	}
303 
304 	/**
305 	 * @since 2.0
306 	 */
307 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
308 			throws HL7Exception {
309 		return attach(host, port, 0, parser, llp, false);
310 	}
311 
312     /**
313      * @since 2.2
314      */
315     public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp)
316             throws HL7Exception {
317         return attachLazily(host, port, 0, parser, llp, false);
318     }
319 
320 
321 	/**
322 	 * @since 2.0
323 	 */
324 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
325 			boolean tls) throws HL7Exception {
326 		return attach(host, port, 0, parser, llp, tls);
327 	}
328 
329     /**
330      * @since 2.0
331      */
332     public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
333                              boolean tls) throws HL7Exception {
334         return attachLazily(host, port, 0, parser, llp, tls);
335     }
336 
337 	/**
338 	 * Informs the ConnectionHub that you are done with the given Connection - if no other code is
339 	 * using it, it will be closed, so you should not attempt to use a Connection after detaching
340 	 * from it. If the connection is not enlisted, this method does nothing.
341 	 */
342 	public void detach(Connection c) {
343 		ConnectionData cd = connections.find(c);
344 		if (cd != null)
345 			connections.remove(cd);
346 	}
347 
348 	/**
349 	 * Closes and discards the given Connection so that it can not be returned in subsequent calls
350 	 * to attach(). This method is to be used when there is a problem with a Connection, e.g. socket
351 	 * connection closed by remote host.
352 	 */
353 	public void discard(Connection c) {
354 		ConnectionData cd = connections.find(c);
355 		if (cd != null)
356 			connections.removeAllOf(cd);
357 	}
358 
359     /**
360      * Closes and discards all connections.
361      */
362 	public void discardAll() {
363 		for (ConnectionData cd : allConnections()) {
364 			connections.removeAllOf(cd);
365 		}
366 	}
367 
368 	private void discardConnectionIfStale(Connection conn) {
369 		if (conn != null && !conn.isOpen()) {
370 			log.info("Discarding connection which appears to be closed. Remote addr: {}",
371 					conn.getRemoteAddress());
372 			discard(conn);
373 		}
374 	}
375 
376 	public Connection getKnownConnection(ConnectionData key) {
377 		return connections.get(key);
378 	}
379 
380 	public boolean isOpen(ConnectionData key) {
381 		return getKnownConnection(key).isOpen();
382 	}
383 
384 	/**
385 	 * Returns the singleton instance of ConnectionHub
386 	 * 
387 	 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
388 	 *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
389 	 */
390 	public static ConnectionHub getInstance() {
391 		return getInstance(DefaultExecutorService.getDefaultService());
392 	}
393 
394 	/**
395 	 * Returns the singleton instance of ConnectionHub.
396 	 * 
397 	 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
398 	 *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
399 	 */
400 	public synchronized static ConnectionHub getInstance(ExecutorService service) {
401 		if (instance == null || service.isShutdown()) {
402 			instance = new ConnectionHub(service);
403 		}
404 		return instance;
405 	}
406 
407 	/**
408 	 * Returns the singleton instance of ConnectionHub.
409 	 * 
410 	 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
411 	 *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
412 	 */
413 	public static ConnectionHub getInstance(HapiContext context) {
414 		if (instance == null || context.getExecutorService().isShutdown()) {
415 			instance = new ConnectionHub(context);
416 		}
417 		return instance;
418 	}
419 
420 	/**
421 	 * <p>
422 	 * Returns a new (non-singleton) instance of the ConnectionHub which uses the given executor
423 	 * service.
424 	 * </p>
425 	 * <p>
426 	 * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a>
427 	 * for an example of how to use ConnectionHub.
428 	 * </p>
429 	 */
430 	public synchronized static ConnectionHub getNewInstance(HapiContext context) {
431 		return new ConnectionHub(context);
432 	}
433 
434 	/**
435 	 * @deprecated default executor service is shut down automatically
436 	 */
437 	public static void shutdown() {
438 		ConnectionHub hub = getInstance();
439 		if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
440 			hub.getHapiContext().getExecutorService().shutdown();
441 			instance = null;
442 		}
443 	}
444 
445 	/**
446 	 * Helper class that implements a map that increases/decreases a counter when an entry is
447 	 * added/removed. It is furthermore intended that an entry's value is derived from its key.
448 	 * 
449 	 * @param <K> key class
450 	 * @param <D> managed value class
451 	 */
452 	private abstract static class CountingMap<K, D> {
453 		private final Map<K, Count> content;
454 
455 		public CountingMap() {
456 			super();
457 			content = new ConcurrentHashMap<>();
458 		}
459 
460 		protected abstract void dispose(D value);
461 
462 		public K find(D value) {
463 			for (Map.Entry<K, Count> entry : content.entrySet()) {
464 				if (entry.getValue().getValue().equals(value)) {
465 					return entry.getKey();
466 				}
467 			}
468 			return null;
469 		}
470 
471 		public D get(K key) {
472 			return content.containsKey(key) ? content.get(key).getValue() : null;
473 		}
474 
475 		public Set<K> keySet() {
476 			return Collections.unmodifiableSet(content.keySet());
477 		}
478 
479 		protected abstract D open(K key) throws Exception;
480 
481 		/**
482 		 * If the key exists, the counter is increased. Otherwise, a value is created, and the
483 		 * key/value pair is added to the map.
484 		 */
485 		public D put(K key) throws Exception {
486 			if (content.containsKey(key)) {
487 				return content.put(key, content.get(key).increase()).getValue();
488 			} else {
489 				Count c = new Count(open(key));
490 				content.put(key, c);
491 				return c.getValue();
492 			}
493 		}
494 
495 		/**
496 		 * If the counter of the key/value is greater than one, the counter is decreased. Otherwise,
497 		 * the entry is removed and the value is cleaned up.
498 		 */
499 		public D remove(K key) {
500 			Count pair = content.get(key);
501 			if (pair == null)
502 				return null;
503 			if (pair.isLast()) {
504 				return removeAllOf(key);
505 			}
506 			return content.put(key, content.get(key).decrease()).getValue();
507 		}
508 
509 		/**
510 		 * The key/value entry is removed and the value is cleaned up.
511 		 */
512 		public D removeAllOf(K key) {
513 			D removed = content.remove(key).value;
514 			dispose(removed);
515 			return removed;
516 		}
517 
518 		private class Count {
519 			private final int count;
520 			private final D value;
521 
522 			public Count(D value) {
523 				this(value, 1);
524 			}
525 
526 			private Count(D value, int number) {
527 				this.value = value;
528 				this.count = number;
529 			}
530 
531 			Count decrease() {
532 				return !isLast() ? new Count(value, count - 1) : null;
533 			}
534 
535 			public D getValue() {
536 				return value;
537 			}
538 
539 			Count increase() {
540 				return new Count(value, count + 1);
541 			}
542 
543 			boolean isLast() {
544 				return count == 1;
545 			}
546 
547 		}
548 
549 	}
550 
551 }