001/** 002The contents of this file are subject to the Mozilla Public License Version 1.1 003(the "License"); you may not use this file except in compliance with the License. 004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 005Software distributed under the License is distributed on an "AS IS" basis, 006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 007specific language governing rights and limitations under the License. 008 009The Original Code is "ConnectionHub.java". Description: 010"Provides access to shared HL7 Connections" 011 012The Initial Developer of the Original Code is University Health Network. Copyright (C) 0132001. All Rights Reserved. 014 015Contributor(s): ______________________________________. 016 017Alternatively, the contents of this file may be used under the terms of the 018GNU General Public License (the �GPL�), in which case the provisions of the GPL are 019applicable instead of those above. If you wish to allow use of your version of this 020file only under the terms of the GPL and not to allow others to use your version 021of this file under the MPL, indicate your decision by deleting the provisions above 022and replace them with the notice and other provisions required by the GPL License. 023If you do not delete the provisions above, a recipient may use your version of 024this file under either the MPL or the GPL. 025 */ 026 027package ca.uhn.hl7v2.app; 028 029import java.util.Collections; 030import java.util.Map; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import java.util.concurrent.ExecutorService; 035 036import ca.uhn.hl7v2.DefaultHapiContext; 037import ca.uhn.hl7v2.HL7Exception; 038import ca.uhn.hl7v2.HapiContext; 039import ca.uhn.hl7v2.HapiContextSupport; 040import ca.uhn.hl7v2.concurrent.DefaultExecutorService; 041import ca.uhn.hl7v2.llp.LowerLayerProtocol; 042import ca.uhn.hl7v2.parser.Parser; 043import ca.uhn.hl7v2.util.ReflectionUtil; 044import ca.uhn.hl7v2.util.SocketFactory; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * <p> 050 * Provides access to shared HL7 Connections. The ConnectionHub has at most one connection to any 051 * given address at any time. 052 * </p> 053 * <p> 054 * <b>Synchronization Note:</b> This class should be safe to use in a multithreaded environment. A 055 * synchronization mutex is maintained for any given target host and port, so that if two threads 056 * are trying to connect to two separate destinations neither will block, but if two threads are 057 * trying to connect to the same destination, one will block until the other has finished trying. 058 * Use caution if this class is to be used in an environment where a very large (over 1000) number 059 * of target host/port destinations will be accessed at the same time. 060 * </p> 061 * 062 * @author Bryan Tripp 063 */ 064public class ConnectionHub extends HapiContextSupport { 065 066 private static volatile ConnectionHub instance = null; 067 private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class); 068 /** 069 * Set a system property with this key to a string containing an integer larger than the default 070 * ("1000") if you need to connect to a very large number of targets at the same time in a 071 * multithreaded environment. 072 */ 073 public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize"; 074 private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>(); 075 private final CountingMap<ConnectionData, Connection> connections; 076 077 /** Creates a new instance of ConnectionHub */ 078 private ConnectionHub(ExecutorService executorService) { 079 this(new DefaultHapiContext(executorService)); 080 } 081 082 private ConnectionHub(HapiContext context) { 083 super(context); 084 connections = new CountingMap<ConnectionData, Connection>() { 085 086 @Override 087 protected void dispose(Connection connection) { 088 connection.close(); 089 } 090 091 @Override 092 protected Connection open(ConnectionData connectionData) throws Exception { 093 return ConnectionFactory 094 .open(connectionData, getHapiContext().getExecutorService()); 095 } 096 097 }; 098 } 099 100 public Set<? extends ConnectionData> allConnections() { 101 return connections.keySet(); 102 } 103 104 /** 105 * @since 2.0 106 */ 107 public Connection attach(ConnectionData data) throws HL7Exception { 108 try { 109 Connection conn = null; 110 // Disallow establishing same connection targets concurrently 111 connectionMutexes.putIfAbsent(data.toString(), data.toString()); 112 String mutex = connectionMutexes.get(data.toString()); 113 synchronized (mutex) { 114 discardConnectionIfStale(connections.get(data)); 115 // Create connection or increase counter 116 conn = connections.put(data); 117 } 118 return conn; 119 } catch (Exception e) { 120 log.debug("Failed to attach", e); 121 throw new HL7Exception("Cannot open connection to " + data.getHost() + ":" 122 + data.getPort() + "/" + data.getPort2(), e); 123 } 124 } 125 126 /** 127 * Returns a Connection to the given address, opening this Connection if necessary. The given 128 * Parser will only be used if a new Connection is opened, so there is no guarantee that the 129 * Connection returned will be using the Parser you provide. If you need explicit access to the 130 * Parser the Connection is using, call <code>Connection.getParser()</code>. 131 * 132 * @since 2.1 133 */ 134 public Connection attach(String host, int port, boolean tls) throws HL7Exception { 135 return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(), 136 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext() 137 .getSocketFactory(), false)); 138 } 139 140 /** 141 * Returns a Connection to the given address, opening this Connection if necessary. The given 142 * Parser will only be used if a new Connection is opened, so there is no guarantee that the 143 * Connection returned will be using the Parser you provide. If you need explicit access to the 144 * Parser the Connection is using, call <code>Connection.getParser()</code>. 145 * 146 * @since 2.2 147 */ 148 public Connection attachLazily(String host, int port, boolean tls) throws HL7Exception { 149 return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(), 150 getHapiContext().getLowerLayerProtocol(), tls, getHapiContext() 151 .getSocketFactory(), true)); 152 } 153 154 /** 155 * @since 2.0 156 */ 157 public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception { 158 return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext() 159 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls, 160 getHapiContext().getSocketFactory(), false)); 161 } 162 163 /** 164 * @since 2.2 165 */ 166 public Connection attachLazily(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception { 167 return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext() 168 .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls, 169 getHapiContext().getSocketFactory(), true)); 170 } 171 172 /** 173 * @since 2.0 174 */ 175 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, 176 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception { 177 return attach(host, outboundPort, inboundPort, parser, llpClass, false); 178 } 179 180 /** 181 * @since 2.0 182 */ 183 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, 184 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception { 185 return attachLazily(host, outboundPort, inboundPort, parser, llpClass, false); 186 } 187 188 /** 189 * @since 2.0 190 */ 191 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, 192 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception { 193 LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass); 194 return attach(host, outboundPort, inboundPort, parser, llp, tls); 195 } 196 197 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, 198 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception { 199 LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass); 200 return attachLazily(host, outboundPort, inboundPort, parser, llp, tls); 201 } 202 203 /** 204 * @since 2.0 205 */ 206 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, 207 LowerLayerProtocol llp, boolean tls) throws HL7Exception { 208 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, false)); 209 } 210 211 /** 212 * @since 2.2 213 */ 214 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, 215 LowerLayerProtocol llp, boolean tls) throws HL7Exception { 216 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, true)); 217 } 218 219 /** 220 * @since 2.1 221 */ 222 public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp, 223 boolean tls, SocketFactory socketFactory) throws HL7Exception { 224 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, false)); 225 } 226 227 /** 228 * @since 2.1 229 */ 230 public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp, 231 boolean tls, SocketFactory socketFactory) throws HL7Exception { 232 return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, true)); 233 } 234 235 /** 236 * @since 2.1 237 */ 238 public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp, 239 boolean tls, SocketFactory socketFactory) throws HL7Exception { 240 return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, false)); 241 } 242 243 /** 244 * @since 2.1 245 */ 246 public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp, 247 boolean tls, SocketFactory socketFactory) throws HL7Exception { 248 return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, true)); 249 } 250 251 /** 252 * @since 2.1 253 */ 254 public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception { 255 return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(), 256 tls, hapiContext.getSocketFactory(), false)); 257 } 258 259 /** 260 * @since 2.2 261 */ 262 public Connection attachLazily(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception { 263 return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(), 264 tls, hapiContext.getSocketFactory(), true)); 265 } 266 267 /** 268 * @since 2.1 269 */ 270 public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception { 271 return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(), 272 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), false)); 273 } 274 275 /** 276 * @since 2.2 277 */ 278 public Connection attachLazily(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception { 279 return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(), 280 hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), true)); 281 } 282 283 /** 284 * @since 1.2 285 */ 286 public Connection attach(String host, int port, Parser parser, 287 Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception { 288 return attach(host, port, parser, llpClass, false); 289 } 290 291 /** 292 * @since 2.0 293 */ 294 public Connection attach(String host, int port, Parser parser, 295 Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception { 296 return attach(host, port, 0, parser, llpClass, tls); 297 } 298 299 /** 300 * @since 2.0 301 */ 302 public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp) 303 throws HL7Exception { 304 return attach(host, port, 0, parser, llp, false); 305 } 306 307 /** 308 * @since 2.2 309 */ 310 public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp) 311 throws HL7Exception { 312 return attachLazily(host, port, 0, parser, llp, false); 313 } 314 315 316 /** 317 * @since 2.0 318 */ 319 public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp, 320 boolean tls) throws HL7Exception { 321 return attach(host, port, 0, parser, llp, tls); 322 } 323 324 /** 325 * @since 2.0 326 */ 327 public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp, 328 boolean tls) throws HL7Exception { 329 return attachLazily(host, port, 0, parser, llp, tls); 330 } 331 332 /** 333 * Informs the ConnectionHub that you are done with the given Connection - if no other code is 334 * using it, it will be closed, so you should not attempt to use a Connection after detaching 335 * from it. If the connection is not enlisted, this method does nothing. 336 */ 337 public void detach(Connection c) { 338 ConnectionData cd = connections.find(c); 339 if (cd != null) 340 connections.remove(cd); 341 } 342 343 /** 344 * Closes and discards the given Connection so that it can not be returned in subsequent calls 345 * to attach(). This method is to be used when there is a problem with a Connection, e.g. socket 346 * connection closed by remote host. 347 */ 348 public void discard(Connection c) { 349 ConnectionData cd = connections.find(c); 350 if (cd != null) 351 connections.removeAllOf(cd); 352 } 353 354 /** 355 * Closes and discards all connections. 356 */ 357 public void discardAll() { 358 for (ConnectionData cd : allConnections()) { 359 connections.removeAllOf(cd); 360 } 361 } 362 363 private void discardConnectionIfStale(Connection conn) { 364 if (conn != null && !conn.isOpen()) { 365 log.info("Discarding connection which appears to be closed. Remote addr: {}", 366 conn.getRemoteAddress()); 367 discard(conn); 368 conn = null; 369 } 370 } 371 372 public Connection getKnownConnection(ConnectionData key) { 373 return connections.get(key); 374 } 375 376 public boolean isOpen(ConnectionData key) { 377 return getKnownConnection(key).isOpen(); 378 } 379 380 /** 381 * Returns the singleton instance of ConnectionHub 382 * 383 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub. 384 * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub. 385 */ 386 public static ConnectionHub getInstance() { 387 return getInstance(DefaultExecutorService.getDefaultService()); 388 } 389 390 /** 391 * Returns the singleton instance of ConnectionHub. 392 * 393 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub. 394 * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub. 395 */ 396 public synchronized static ConnectionHub getInstance(ExecutorService service) { 397 if (instance == null || service.isShutdown()) { 398 instance = new ConnectionHub(service); 399 } 400 return instance; 401 } 402 403 /** 404 * Returns the singleton instance of ConnectionHub. 405 * 406 * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub. 407 * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub. 408 */ 409 public static ConnectionHub getInstance(HapiContext context) { 410 if (instance == null || context.getExecutorService().isShutdown()) { 411 instance = new ConnectionHub(context); 412 } 413 return instance; 414 } 415 416 /** 417 * <p> 418 * Returns a new (non-singleton) instance of the ConnectionHub which uses the given executor 419 * service. 420 * </p> 421 * <p> 422 * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> 423 * for an example of how to use ConnectionHub. 424 * </p> 425 */ 426 public synchronized static ConnectionHub getNewInstance(HapiContext context) { 427 return new ConnectionHub(context); 428 } 429 430 /** 431 * @deprecated default executor service is shut down automatically 432 */ 433 public static void shutdown() { 434 ConnectionHub hub = getInstance(); 435 if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) { 436 hub.getHapiContext().getExecutorService().shutdown(); 437 instance = null; 438 } 439 } 440 441 /** 442 * Helper class that implements a map that increases/decreases a counter when an entry is 443 * added/removed. It is furthermore intended that an entry's value is derived from its key. 444 * 445 * @param <K> key class 446 * @param <D> managed value class 447 */ 448 private abstract class CountingMap<K, D> { 449 private Map<K, Count> content; 450 451 public CountingMap() { 452 super(); 453 content = new ConcurrentHashMap<K, Count>(); 454 } 455 456 protected abstract void dispose(D value); 457 458 public K find(D value) { 459 for (Map.Entry<K, Count> entry : content.entrySet()) { 460 if (entry.getValue().getValue().equals(value)) { 461 return entry.getKey(); 462 } 463 } 464 return null; 465 } 466 467 public D get(K key) { 468 return content.containsKey(key) ? content.get(key).getValue() : null; 469 } 470 471 public Set<K> keySet() { 472 return Collections.unmodifiableSet(content.keySet()); 473 } 474 475 protected abstract D open(K key) throws Exception; 476 477 /** 478 * If the key exists, the counter is increased. Otherwise, a value is created, and the 479 * key/value pair is added to the map. 480 */ 481 public D put(K key) throws Exception { 482 if (content.containsKey(key)) { 483 return content.put(key, content.get(key).increase()).getValue(); 484 } else { 485 Count c = new Count(open(key)); 486 content.put(key, c); 487 return c.getValue(); 488 } 489 } 490 491 /** 492 * If the counter of the key/value is greater than one, the counter is decreased. Otherwise, 493 * the entry is removed and the value is cleaned up. 494 */ 495 public D remove(K key) { 496 Count pair = content.get(key); 497 if (pair == null) 498 return null; 499 if (pair.isLast()) { 500 return removeAllOf(key); 501 } 502 return content.put(key, content.get(key).decrease()).getValue(); 503 } 504 505 /** 506 * The key/value entry is removed and the value is cleaned up. 507 */ 508 public D removeAllOf(K key) { 509 D removed = content.remove(key).value; 510 dispose(removed); 511 return removed; 512 } 513 514 private class Count { 515 private int count; 516 private D value; 517 518 public Count(D value) { 519 this(value, 1); 520 } 521 522 private Count(D value, int number) { 523 this.value = value; 524 this.count = number; 525 } 526 527 Count decrease() { 528 return !isLast() ? new Count(value, count - 1) : null; 529 } 530 531 public D getValue() { 532 return value; 533 } 534 535 Count increase() { 536 return new Count(value, count + 1); 537 } 538 539 boolean isLast() { 540 return count == 1; 541 } 542 543 } 544 545 } 546 547}