1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  package ca.uhn.hl7v2.app;
28  
29  import java.io.IOException;
30  import java.util.Collections;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ExecutorService;
36  
37  import ca.uhn.hl7v2.DefaultHapiContext;
38  import ca.uhn.hl7v2.HL7Exception;
39  import ca.uhn.hl7v2.HapiContext;
40  import ca.uhn.hl7v2.HapiContextSupport;
41  import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
42  import ca.uhn.hl7v2.llp.LowerLayerProtocol;
43  import ca.uhn.hl7v2.parser.Parser;
44  import ca.uhn.hl7v2.util.ReflectionUtil;
45  import ca.uhn.hl7v2.util.SocketFactory;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  public class ConnectionHub extends HapiContextSupport {
66  
67  	private static volatile ConnectionHub instance = null;
68  	private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
69  	
70  
71  
72  
73  
74  	public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
75  	private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<>();
76  	private final CountingMap<ConnectionData, Connection> connections;
77  
78  	
79  	private ConnectionHub(ExecutorService executorService) {
80  		this(new DefaultHapiContext(executorService));
81  	}
82  
83  	private ConnectionHub(HapiContext context) {
84  		super(context);
85  		connections = new CountingMap<ConnectionData, Connection>() {
86  
87  			@Override
88  			protected void dispose(Connection connection) {
89  				try {
90  					connection.close();
91  				} catch (IOException e) {
92  					throw new RuntimeException(e);
93  				}
94  			}
95  
96  			@Override
97  			protected Connection open(ConnectionData connectionData) throws Exception {
98  				return ConnectionFactory
99  						.open(connectionData, getHapiContext().getExecutorService());
100 			}
101 
102 		};
103 	}
104 
105 	public Set<? extends ConnectionData> allConnections() {
106 		return connections.keySet();
107 	}
108 
109 	
110 
111 
112 	public Connection attach(ConnectionData data) throws HL7Exception {
113 		try {
114 			Connection conn;
115 			
116 			connectionMutexes.putIfAbsent(data.toString(), data.toString());
117 			String mutex = connectionMutexes.get(data.toString());
118 			synchronized (mutex) {
119 				discardConnectionIfStale(connections.get(data));
120 				
121 				conn = connections.put(data);
122 			}
123 			return conn;
124 		} catch (Exception e) {
125 			log.debug("Failed to attach", e);
126 			throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
127 					+ data.getPort() + "/" + data.getPort2(), e);
128 		}
129 	}
130 
131     
132 
133 
134 
135 
136 
137 
138 
139     public Connection attach(String host, int port, boolean tls) throws HL7Exception {
140         return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
141                 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
142                 .getSocketFactory(), false));
143     }
144 
145 	
146 
147 
148 
149 
150 
151 
152 
153 	public Connection attachLazily(String host, int port, boolean tls) throws HL7Exception {
154 		return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
155 				getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
156 						.getSocketFactory(), true));
157 	}
158 
159     
160 
161 
162     public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
163         return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
164                 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
165                 getHapiContext().getSocketFactory(), false));
166     }
167 
168 	
169 
170 
171 	public Connection attachLazily(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
172 		return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
173 				.getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
174 				getHapiContext().getSocketFactory(), true));
175 	}
176 
177 	
178 
179 
180 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
181 			Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
182 		return attach(host, outboundPort, inboundPort, parser, llpClass, false);
183 	}
184 
185     
186 
187 
188     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
189                              Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
190         return attachLazily(host, outboundPort, inboundPort, parser, llpClass, false);
191     }
192 
193 	
194 
195 
196 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
197 		Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
198 		LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
199 		return attach(host, outboundPort, inboundPort, parser, llp, tls);
200 	}
201 
202     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
203                              Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
204         LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
205         return attachLazily(host, outboundPort, inboundPort, parser, llp, tls);
206     }
207 
208 	
209 
210 
211 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
212 			LowerLayerProtocol llp, boolean tls) throws HL7Exception {
213 		return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, false));
214 	}
215 
216     
217 
218 
219     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
220                              LowerLayerProtocol llp, boolean tls) throws HL7Exception {
221         return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, true));
222     }
223 
224 	
225 
226 
227 	public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
228                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
229 		return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, false));
230 	}
231 
232     
233 
234 
235     public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
236                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
237         return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, true));
238     }
239 
240 	
241 
242 
243 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
244                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
245 		return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, false));
246 	}
247 
248     
249 
250 
251     public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
252                              boolean tls, SocketFactory socketFactory) throws HL7Exception {
253         return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, true));
254     }
255 
256 	
257 
258 
259 	public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
260 		return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
261                 tls, hapiContext.getSocketFactory(), false));
262 	}
263 
264     
265 
266 
267     public Connection attachLazily(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
268         return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
269                 tls, hapiContext.getSocketFactory(), true));
270     }
271 
272 	
273 
274 
275 	public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
276 		return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
277                 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), false));
278 	}
279 
280     
281 
282 
283     public Connection attachLazily(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
284         return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
285                 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), true));
286     }
287 
288 	
289 
290 
291 	public Connection attach(String host, int port, Parser parser,
292 			Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
293 		return attach(host, port, parser, llpClass, false);
294 	}
295 
296 	
297 
298 
299 	public Connection attach(String host, int port, Parser parser,
300 			Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
301 		return attach(host, port, 0, parser, llpClass, tls);
302 	}
303 
304 	
305 
306 
307 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
308 			throws HL7Exception {
309 		return attach(host, port, 0, parser, llp, false);
310 	}
311 
312     
313 
314 
315     public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp)
316             throws HL7Exception {
317         return attachLazily(host, port, 0, parser, llp, false);
318     }
319 
320 
321 	
322 
323 
324 	public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
325 			boolean tls) throws HL7Exception {
326 		return attach(host, port, 0, parser, llp, tls);
327 	}
328 
329     
330 
331 
332     public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
333                              boolean tls) throws HL7Exception {
334         return attachLazily(host, port, 0, parser, llp, tls);
335     }
336 
337 	
338 
339 
340 
341 
342 	public void detach(Connection c) {
343 		ConnectionData cd = connections.find(c);
344 		if (cd != null)
345 			connections.remove(cd);
346 	}
347 
348 	
349 
350 
351 
352 
353 	public void discard(Connection c) {
354 		ConnectionData cd = connections.find(c);
355 		if (cd != null)
356 			connections.removeAllOf(cd);
357 	}
358 
359     
360 
361 
362 	public void discardAll() {
363 		for (ConnectionData cd : allConnections()) {
364 			connections.removeAllOf(cd);
365 		}
366 	}
367 
368 	private void discardConnectionIfStale(Connection conn) {
369 		if (conn != null && !conn.isOpen()) {
370 			log.info("Discarding connection which appears to be closed. Remote addr: {}",
371 					conn.getRemoteAddress());
372 			discard(conn);
373 		}
374 	}
375 
376 	public Connection getKnownConnection(ConnectionData key) {
377 		return connections.get(key);
378 	}
379 
380 	public boolean isOpen(ConnectionData key) {
381 		return getKnownConnection(key).isOpen();
382 	}
383 
384 	
385 
386 
387 
388 
389 
390 	public static ConnectionHub getInstance() {
391 		return getInstance(DefaultExecutorService.getDefaultService());
392 	}
393 
394 	
395 
396 
397 
398 
399 
400 	public synchronized static ConnectionHub getInstance(ExecutorService service) {
401 		if (instance == null || service.isShutdown()) {
402 			instance = new ConnectionHub(service);
403 		}
404 		return instance;
405 	}
406 
407 	
408 
409 
410 
411 
412 
413 	public static ConnectionHub getInstance(HapiContext context) {
414 		if (instance == null || context.getExecutorService().isShutdown()) {
415 			instance = new ConnectionHub(context);
416 		}
417 		return instance;
418 	}
419 
420 	
421 
422 
423 
424 
425 
426 
427 
428 
429 
430 	public synchronized static ConnectionHub getNewInstance(HapiContext context) {
431 		return new ConnectionHub(context);
432 	}
433 
434 	
435 
436 
437 	public static void shutdown() {
438 		ConnectionHub hub = getInstance();
439 		if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
440 			hub.getHapiContext().getExecutorService().shutdown();
441 			instance = null;
442 		}
443 	}
444 
445 	
446 
447 
448 
449 
450 
451 
452 	private abstract static class CountingMap<K, D> {
453 		private final Map<K, Count> content;
454 
455 		public CountingMap() {
456 			super();
457 			content = new ConcurrentHashMap<>();
458 		}
459 
460 		protected abstract void dispose(D value);
461 
462 		public K find(D value) {
463 			for (Map.Entry<K, Count> entry : content.entrySet()) {
464 				if (entry.getValue().getValue().equals(value)) {
465 					return entry.getKey();
466 				}
467 			}
468 			return null;
469 		}
470 
471 		public D get(K key) {
472 			return content.containsKey(key) ? content.get(key).getValue() : null;
473 		}
474 
475 		public Set<K> keySet() {
476 			return Collections.unmodifiableSet(content.keySet());
477 		}
478 
479 		protected abstract D open(K key) throws Exception;
480 
481 		
482 
483 
484 
485 		public D put(K key) throws Exception {
486 			if (content.containsKey(key)) {
487 				return content.put(key, content.get(key).increase()).getValue();
488 			} else {
489 				Count c = new Count(open(key));
490 				content.put(key, c);
491 				return c.getValue();
492 			}
493 		}
494 
495 		
496 
497 
498 
499 		public D remove(K key) {
500 			Count pair = content.get(key);
501 			if (pair == null)
502 				return null;
503 			if (pair.isLast()) {
504 				return removeAllOf(key);
505 			}
506 			return content.put(key, content.get(key).decrease()).getValue();
507 		}
508 
509 		
510 
511 
512 		public D removeAllOf(K key) {
513 			D removed = content.remove(key).value;
514 			dispose(removed);
515 			return removed;
516 		}
517 
518 		private class Count {
519 			private final int count;
520 			private final D value;
521 
522 			public Count(D value) {
523 				this(value, 1);
524 			}
525 
526 			private Count(D value, int number) {
527 				this.value = value;
528 				this.count = number;
529 			}
530 
531 			Count decrease() {
532 				return !isLast() ? new Count(value, count - 1) : null;
533 			}
534 
535 			public D getValue() {
536 				return value;
537 			}
538 
539 			Count increase() {
540 				return new Count(value, count + 1);
541 			}
542 
543 			boolean isLast() {
544 				return count == 1;
545 			}
546 
547 		}
548 
549 	}
550 
551 }