002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
009The Original Code is "ConnectionHub.java".  Description: 
010"Provides access to shared HL7 Connections" 
012The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0132001.  All Rights Reserved. 
015Contributor(s): ______________________________________. 
017Alternatively, the contents of this file may be used under the terms of the 
018GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019applicable instead of those above.  If you wish to allow use of your version of this 
020file only under the terms of the GPL and not to allow others to use your version 
021of this file under the MPL, indicate your decision by deleting  the provisions above 
022and replace  them with the notice and other provisions required by the GPL License.  
023If you do not delete the provisions above, a recipient may use your version of 
024this file under either the MPL or the GPL. 
025 */
027package ca.uhn.hl7v2.app;
029import java.util.Collections;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.ExecutorService;
036import ca.uhn.hl7v2.DefaultHapiContext;
037import ca.uhn.hl7v2.HL7Exception;
038import ca.uhn.hl7v2.HapiContext;
039import ca.uhn.hl7v2.HapiContextSupport;
040import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
041import ca.uhn.hl7v2.llp.LowerLayerProtocol;
042import ca.uhn.hl7v2.parser.Parser;
043import ca.uhn.hl7v2.util.ReflectionUtil;
044import ca.uhn.hl7v2.util.SocketFactory;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
049 * <p>
050 * Provides access to shared HL7 Connections. The ConnectionHub has at most one connection to any
051 * given address at any time.
052 * </p>
053 * <p>
054 * <b>Synchronization Note:</b> This class should be safe to use in a multithreaded environment. A
055 * synchronization mutex is maintained for any given target host and port, so that if two threads
056 * are trying to connect to two separate destinations neither will block, but if two threads are
057 * trying to connect to the same destination, one will block until the other has finished trying.
058 * Use caution if this class is to be used in an environment where a very large (over 1000) number
059 * of target host/port destinations will be accessed at the same time.
060 * </p>
061 * 
062 * @author Bryan Tripp
063 */
064public class ConnectionHub extends HapiContextSupport {
066        private static volatile ConnectionHub instance = null;
067        private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
068        /**
069         * Set a system property with this key to a string containing an integer larger than the default
070         * ("1000") if you need to connect to a very large number of targets at the same time in a
071         * multithreaded environment.
072         */
073        public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
074        private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>();
075        private final CountingMap<ConnectionData, Connection> connections;
077        /** Creates a new instance of ConnectionHub */
078        private ConnectionHub(ExecutorService executorService) {
079                this(new DefaultHapiContext(executorService));
080        }
082        private ConnectionHub(HapiContext context) {
083                super(context);
084                connections = new CountingMap<ConnectionData, Connection>() {
086                        @Override
087                        protected void dispose(Connection connection) {
088                                connection.close();
089                        }
091                        @Override
092                        protected Connection open(ConnectionData connectionData) throws Exception {
093                                return ConnectionFactory
094                                                .open(connectionData, getHapiContext().getExecutorService());
095                        }
097                };
098        }
100        public Set<? extends ConnectionData> allConnections() {
101                return connections.keySet();
102        }
104        /**
105         * @since 2.0
106         */
107        public Connection attach(ConnectionData data) throws HL7Exception {
108                try {
109                        Connection conn = null;
110                        // Disallow establishing same connection targets concurrently
111                        connectionMutexes.putIfAbsent(data.toString(), data.toString());
112                        String mutex = connectionMutexes.get(data.toString());
113                        synchronized (mutex) {
114                                discardConnectionIfStale(connections.get(data));
115                                // Create connection or increase counter
116                                conn = connections.put(data);
117                        }
118                        return conn;
119                } catch (Exception e) {
120                        log.debug("Failed to attach", e);
121                        throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
122                                        + data.getPort() + "/" + data.getPort2(), e);
123                }
124        }
126    /**
127     * Returns a Connection to the given address, opening this Connection if necessary. The given
128     * Parser will only be used if a new Connection is opened, so there is no guarantee that the
129     * Connection returned will be using the Parser you provide. If you need explicit access to the
130     * Parser the Connection is using, call <code>Connection.getParser()</code>.
131     *
132     * @since 2.1
133     */
134    public Connection attach(String host, int port, boolean tls) throws HL7Exception {
135        return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
136                getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
137                .getSocketFactory(), false));
138    }
140        /**
141         * Returns a Connection to the given address, opening this Connection if necessary. The given
142         * Parser will only be used if a new Connection is opened, so there is no guarantee that the
143         * Connection returned will be using the Parser you provide. If you need explicit access to the
144         * Parser the Connection is using, call <code>Connection.getParser()</code>.
145         * 
146         * @since 2.2
147         */
148        public Connection attachLazily(String host, int port, boolean tls) throws HL7Exception {
149                return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
150                                getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
151                                                .getSocketFactory(), true));
152        }
154    /**
155     * @since 2.0
156     */
157    public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
158        return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
159                .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
160                getHapiContext().getSocketFactory(), false));
161    }
163        /**
164         * @since 2.2
165         */
166        public Connection attachLazily(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
167                return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
168                                .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
169                                getHapiContext().getSocketFactory(), true));
170        }
172        /**
173         * @since 2.0
174         */
175        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
176                        Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
177                return attach(host, outboundPort, inboundPort, parser, llpClass, false);
178        }
180    /**
181     * @since 2.0
182     */
183    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
184                             Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
185        return attachLazily(host, outboundPort, inboundPort, parser, llpClass, false);
186    }
188        /**
189         * @since 2.0
190         */
191        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
192                Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
193                LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
194                return attach(host, outboundPort, inboundPort, parser, llp, tls);
195        }
197    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
198                             Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
199        LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
200        return attachLazily(host, outboundPort, inboundPort, parser, llp, tls);
201    }
203        /**
204         * @since 2.0
205         */
206        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
207                        LowerLayerProtocol llp, boolean tls) throws HL7Exception {
208                return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, false));
209        }
211    /**
212     * @since 2.2
213     */
214    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
215                             LowerLayerProtocol llp, boolean tls) throws HL7Exception {
216        return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, true));
217    }
219        /**
220         * @since 2.1
221         */
222        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
223                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
224                return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, false));
225        }
227    /**
228     * @since 2.1
229     */
230    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
231                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
232        return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, true));
233    }
235        /**
236         * @since 2.1
237         */
238        public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
239                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
240                return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, false));
241        }
243    /**
244     * @since 2.1
245     */
246    public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
247                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
248        return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, true));
249    }
251        /**
252         * @since 2.1
253         */
254        public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
255                return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
256                tls, hapiContext.getSocketFactory(), false));
257        }
259    /**
260     * @since 2.2
261     */
262    public Connection attachLazily(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
263        return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
264                tls, hapiContext.getSocketFactory(), true));
265    }
267        /**
268         * @since 2.1
269         */
270        public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
271                return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
272                hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), false));
273        }
275    /**
276     * @since 2.2
277     */
278    public Connection attachLazily(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
279        return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
280                hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), true));
281    }
283        /**
284         * @since 1.2
285         */
286        public Connection attach(String host, int port, Parser parser,
287                        Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
288                return attach(host, port, parser, llpClass, false);
289        }
291        /**
292         * @since 2.0
293         */
294        public Connection attach(String host, int port, Parser parser,
295                        Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
296                return attach(host, port, 0, parser, llpClass, tls);
297        }
299        /**
300         * @since 2.0
301         */
302        public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
303                        throws HL7Exception {
304                return attach(host, port, 0, parser, llp, false);
305        }
307    /**
308     * @since 2.2
309     */
310    public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp)
311            throws HL7Exception {
312        return attachLazily(host, port, 0, parser, llp, false);
313    }
316        /**
317         * @since 2.0
318         */
319        public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
320                        boolean tls) throws HL7Exception {
321                return attach(host, port, 0, parser, llp, tls);
322        }
324    /**
325     * @since 2.0
326     */
327    public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
328                             boolean tls) throws HL7Exception {
329        return attachLazily(host, port, 0, parser, llp, tls);
330    }
332        /**
333         * Informs the ConnectionHub that you are done with the given Connection - if no other code is
334         * using it, it will be closed, so you should not attempt to use a Connection after detaching
335         * from it. If the connection is not enlisted, this method does nothing.
336         */
337        public void detach(Connection c) {
338                ConnectionData cd = connections.find(c);
339                if (cd != null)
340                        connections.remove(cd);
341        }
343        /**
344         * Closes and discards the given Connection so that it can not be returned in subsequent calls
345         * to attach(). This method is to be used when there is a problem with a Connection, e.g. socket
346         * connection closed by remote host.
347         */
348        public void discard(Connection c) {
349                ConnectionData cd = connections.find(c);
350                if (cd != null)
351                        connections.removeAllOf(cd);
352        }
354    /**
355     * Closes and discards all connections.
356     */
357        public void discardAll() {
358                for (ConnectionData cd : allConnections()) {
359                        connections.removeAllOf(cd);
360                }
361        }
363        private void discardConnectionIfStale(Connection conn) {
364                if (conn != null && !conn.isOpen()) {
365                        log.info("Discarding connection which appears to be closed. Remote addr: {}",
366                                        conn.getRemoteAddress());
367                        discard(conn);
368                        conn = null;
369                }
370        }
372        public Connection getKnownConnection(ConnectionData key) {
373                return connections.get(key);
374        }
376        public boolean isOpen(ConnectionData key) {
377                return getKnownConnection(key).isOpen();
378        }
380        /**
381         * Returns the singleton instance of ConnectionHub
382         * 
383         * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
384         *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
385         */
386        public static ConnectionHub getInstance() {
387                return getInstance(DefaultExecutorService.getDefaultService());
388        }
390        /**
391         * Returns the singleton instance of ConnectionHub.
392         * 
393         * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
394         *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
395         */
396        public synchronized static ConnectionHub getInstance(ExecutorService service) {
397                if (instance == null || service.isShutdown()) {
398                        instance = new ConnectionHub(service);
399                }
400                return instance;
401        }
403        /**
404         * Returns the singleton instance of ConnectionHub.
405         * 
406         * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
407         *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
408         */
409        public static ConnectionHub getInstance(HapiContext context) {
410                if (instance == null || context.getExecutorService().isShutdown()) {
411                        instance = new ConnectionHub(context);
412                }
413                return instance;
414        }
416        /**
417         * <p>
418         * Returns a new (non-singleton) instance of the ConnectionHub which uses the given executor
419         * service.
420         * </p>
421         * <p>
422         * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a>
423         * for an example of how to use ConnectionHub.
424         * </p>
425         */
426        public synchronized static ConnectionHub getNewInstance(HapiContext context) {
427                return new ConnectionHub(context);
428        }
430        /**
431         * @deprecated default executor service is shut down automatically
432         */
433        public static void shutdown() {
434                ConnectionHub hub = getInstance();
435                if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
436                        hub.getHapiContext().getExecutorService().shutdown();
437                        instance = null;
438                }
439        }
441        /**
442         * Helper class that implements a map that increases/decreases a counter when an entry is
443         * added/removed. It is furthermore intended that an entry's value is derived from its key.
444         * 
445         * @param <K> key class
446         * @param <D> managed value class
447         */
448        private abstract class CountingMap<K, D> {
449                private Map<K, Count> content;
451                public CountingMap() {
452                        super();
453                        content = new ConcurrentHashMap<K, Count>();
454                }
456                protected abstract void dispose(D value);
458                public K find(D value) {
459                        for (Map.Entry<K, Count> entry : content.entrySet()) {
460                                if (entry.getValue().getValue().equals(value)) {
461                                        return entry.getKey();
462                                }
463                        }
464                        return null;
465                }
467                public D get(K key) {
468                        return content.containsKey(key) ? content.get(key).getValue() : null;
469                }
471                public Set<K> keySet() {
472                        return Collections.unmodifiableSet(content.keySet());
473                }
475                protected abstract D open(K key) throws Exception;
477                /**
478                 * If the key exists, the counter is increased. Otherwise, a value is created, and the
479                 * key/value pair is added to the map.
480                 */
481                public D put(K key) throws Exception {
482                        if (content.containsKey(key)) {
483                                return content.put(key, content.get(key).increase()).getValue();
484                        } else {
485                                Count c = new Count(open(key));
486                                content.put(key, c);
487                                return c.getValue();
488                        }
489                }
491                /**
492                 * If the counter of the key/value is greater than one, the counter is decreased. Otherwise,
493                 * the entry is removed and the value is cleaned up.
494                 */
495                public D remove(K key) {
496                        Count pair = content.get(key);
497                        if (pair == null)
498                                return null;
499                        if (pair.isLast()) {
500                                return removeAllOf(key);
501                        }
502                        return content.put(key, content.get(key).decrease()).getValue();
503                }
505                /**
506                 * The key/value entry is removed and the value is cleaned up.
507                 */
508                public D removeAllOf(K key) {
509                        D removed = content.remove(key).value;
510                        dispose(removed);
511                        return removed;
512                }
514                private class Count {
515                        private int count;
516                        private D value;
518                        public Count(D value) {
519                                this(value, 1);
520                        }
522                        private Count(D value, int number) {
523                                this.value = value;
524                                this.count = number;
525                        }
527                        Count decrease() {
528                                return !isLast() ? new Count(value, count - 1) : null;
529                        }
531                        public D getValue() {
532                                return value;
533                        }
535                        Count increase() {
536                                return new Count(value, count + 1);
537                        }
539                        boolean isLast() {
540                                return count == 1;
541                        }
543                }
545        }