1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package ca.uhn.hl7v2.app;
28
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ExecutorService;
36
37 import ca.uhn.hl7v2.DefaultHapiContext;
38 import ca.uhn.hl7v2.HL7Exception;
39 import ca.uhn.hl7v2.HapiContext;
40 import ca.uhn.hl7v2.HapiContextSupport;
41 import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
42 import ca.uhn.hl7v2.llp.LowerLayerProtocol;
43 import ca.uhn.hl7v2.parser.Parser;
44 import ca.uhn.hl7v2.util.ReflectionUtil;
45 import ca.uhn.hl7v2.util.SocketFactory;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class ConnectionHub extends HapiContextSupport {
66
67 private static volatile ConnectionHub instance = null;
68 private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
69
70
71
72
73
74 public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
75 private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<>();
76 private final CountingMap<ConnectionData, Connection> connections;
77
78
79 private ConnectionHub(ExecutorService executorService) {
80 this(new DefaultHapiContext(executorService));
81 }
82
83 private ConnectionHub(HapiContext context) {
84 super(context);
85 connections = new CountingMap<ConnectionData, Connection>() {
86
87 @Override
88 protected void dispose(Connection connection) {
89 try {
90 connection.close();
91 } catch (IOException e) {
92 throw new RuntimeException(e);
93 }
94 }
95
96 @Override
97 protected Connection open(ConnectionData connectionData) throws Exception {
98 return ConnectionFactory
99 .open(connectionData, getHapiContext().getExecutorService());
100 }
101
102 };
103 }
104
105 public Set<? extends ConnectionData> allConnections() {
106 return connections.keySet();
107 }
108
109
110
111
112 public Connection attach(ConnectionData data) throws HL7Exception {
113 try {
114 Connection conn;
115
116 connectionMutexes.putIfAbsent(data.toString(), data.toString());
117 String mutex = connectionMutexes.get(data.toString());
118 synchronized (mutex) {
119 discardConnectionIfStale(connections.get(data));
120
121 conn = connections.put(data);
122 }
123 return conn;
124 } catch (Exception e) {
125 log.debug("Failed to attach", e);
126 throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
127 + data.getPort() + "/" + data.getPort2(), e);
128 }
129 }
130
131
132
133
134
135
136
137
138
139 public Connection attach(String host, int port, boolean tls) throws HL7Exception {
140 return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
141 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
142 .getSocketFactory(), false));
143 }
144
145
146
147
148
149
150
151
152
153 public Connection attachLazily(String host, int port, boolean tls) throws HL7Exception {
154 return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
155 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
156 .getSocketFactory(), true));
157 }
158
159
160
161
162 public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
163 return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
164 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
165 getHapiContext().getSocketFactory(), false));
166 }
167
168
169
170
171 public Connection attachLazily(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
172 return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
173 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
174 getHapiContext().getSocketFactory(), true));
175 }
176
177
178
179
180 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
181 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
182 return attach(host, outboundPort, inboundPort, parser, llpClass, false);
183 }
184
185
186
187
188 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
189 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
190 return attachLazily(host, outboundPort, inboundPort, parser, llpClass, false);
191 }
192
193
194
195
196 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
197 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
198 LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
199 return attach(host, outboundPort, inboundPort, parser, llp, tls);
200 }
201
202 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
203 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
204 LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
205 return attachLazily(host, outboundPort, inboundPort, parser, llp, tls);
206 }
207
208
209
210
211 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
212 LowerLayerProtocol llp, boolean tls) throws HL7Exception {
213 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, false));
214 }
215
216
217
218
219 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
220 LowerLayerProtocol llp, boolean tls) throws HL7Exception {
221 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, true));
222 }
223
224
225
226
227 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
228 boolean tls, SocketFactory socketFactory) throws HL7Exception {
229 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, false));
230 }
231
232
233
234
235 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
236 boolean tls, SocketFactory socketFactory) throws HL7Exception {
237 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, true));
238 }
239
240
241
242
243 public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
244 boolean tls, SocketFactory socketFactory) throws HL7Exception {
245 return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, false));
246 }
247
248
249
250
251 public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
252 boolean tls, SocketFactory socketFactory) throws HL7Exception {
253 return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, true));
254 }
255
256
257
258
259 public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
260 return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
261 tls, hapiContext.getSocketFactory(), false));
262 }
263
264
265
266
267 public Connection attachLazily(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
268 return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
269 tls, hapiContext.getSocketFactory(), true));
270 }
271
272
273
274
275 public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
276 return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
277 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), false));
278 }
279
280
281
282
283 public Connection attachLazily(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
284 return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
285 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), true));
286 }
287
288
289
290
291 public Connection attach(String host, int port, Parser parser,
292 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
293 return attach(host, port, parser, llpClass, false);
294 }
295
296
297
298
299 public Connection attach(String host, int port, Parser parser,
300 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
301 return attach(host, port, 0, parser, llpClass, tls);
302 }
303
304
305
306
307 public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
308 throws HL7Exception {
309 return attach(host, port, 0, parser, llp, false);
310 }
311
312
313
314
315 public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp)
316 throws HL7Exception {
317 return attachLazily(host, port, 0, parser, llp, false);
318 }
319
320
321
322
323
324 public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
325 boolean tls) throws HL7Exception {
326 return attach(host, port, 0, parser, llp, tls);
327 }
328
329
330
331
332 public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
333 boolean tls) throws HL7Exception {
334 return attachLazily(host, port, 0, parser, llp, tls);
335 }
336
337
338
339
340
341
342 public void detach(Connection c) {
343 ConnectionData cd = connections.find(c);
344 if (cd != null)
345 connections.remove(cd);
346 }
347
348
349
350
351
352
353 public void discard(Connection c) {
354 ConnectionData cd = connections.find(c);
355 if (cd != null)
356 connections.removeAllOf(cd);
357 }
358
359
360
361
362 public void discardAll() {
363 for (ConnectionData cd : allConnections()) {
364 connections.removeAllOf(cd);
365 }
366 }
367
368 private void discardConnectionIfStale(Connection conn) {
369 if (conn != null && !conn.isOpen()) {
370 log.info("Discarding connection which appears to be closed. Remote addr: {}",
371 conn.getRemoteAddress());
372 discard(conn);
373 }
374 }
375
376 public Connection getKnownConnection(ConnectionData key) {
377 return connections.get(key);
378 }
379
380 public boolean isOpen(ConnectionData key) {
381 return getKnownConnection(key).isOpen();
382 }
383
384
385
386
387
388
389
390 public static ConnectionHub getInstance() {
391 return getInstance(DefaultExecutorService.getDefaultService());
392 }
393
394
395
396
397
398
399
400 public synchronized static ConnectionHub getInstance(ExecutorService service) {
401 if (instance == null || service.isShutdown()) {
402 instance = new ConnectionHub(service);
403 }
404 return instance;
405 }
406
407
408
409
410
411
412
413 public static ConnectionHub getInstance(HapiContext context) {
414 if (instance == null || context.getExecutorService().isShutdown()) {
415 instance = new ConnectionHub(context);
416 }
417 return instance;
418 }
419
420
421
422
423
424
425
426
427
428
429
430 public synchronized static ConnectionHub getNewInstance(HapiContext context) {
431 return new ConnectionHub(context);
432 }
433
434
435
436
437 public static void shutdown() {
438 ConnectionHub hub = getInstance();
439 if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
440 hub.getHapiContext().getExecutorService().shutdown();
441 instance = null;
442 }
443 }
444
445
446
447
448
449
450
451
452 private abstract static class CountingMap<K, D> {
453 private final Map<K, Count> content;
454
455 public CountingMap() {
456 super();
457 content = new ConcurrentHashMap<>();
458 }
459
460 protected abstract void dispose(D value);
461
462 public K find(D value) {
463 for (Map.Entry<K, Count> entry : content.entrySet()) {
464 if (entry.getValue().getValue().equals(value)) {
465 return entry.getKey();
466 }
467 }
468 return null;
469 }
470
471 public D get(K key) {
472 return content.containsKey(key) ? content.get(key).getValue() : null;
473 }
474
475 public Set<K> keySet() {
476 return Collections.unmodifiableSet(content.keySet());
477 }
478
479 protected abstract D open(K key) throws Exception;
480
481
482
483
484
485 public D put(K key) throws Exception {
486 if (content.containsKey(key)) {
487 return content.put(key, content.get(key).increase()).getValue();
488 } else {
489 Count c = new Count(open(key));
490 content.put(key, c);
491 return c.getValue();
492 }
493 }
494
495
496
497
498
499 public D remove(K key) {
500 Count pair = content.get(key);
501 if (pair == null)
502 return null;
503 if (pair.isLast()) {
504 return removeAllOf(key);
505 }
506 return content.put(key, content.get(key).decrease()).getValue();
507 }
508
509
510
511
512 public D removeAllOf(K key) {
513 D removed = content.remove(key).value;
514 dispose(removed);
515 return removed;
516 }
517
518 private class Count {
519 private final int count;
520 private final D value;
521
522 public Count(D value) {
523 this(value, 1);
524 }
525
526 private Count(D value, int number) {
527 this.value = value;
528 this.count = number;
529 }
530
531 Count decrease() {
532 return !isLast() ? new Count(value, count - 1) : null;
533 }
534
535 public D getValue() {
536 return value;
537 }
538
539 Count increase() {
540 return new Count(value, count + 1);
541 }
542
543 boolean isLast() {
544 return count == 1;
545 }
546
547 }
548
549 }
550
551 }