001/**
002The contents of this file are subject to the Mozilla Public License Version 1.1 
003(the "License"); you may not use this file except in compliance with the License. 
004You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
005Software distributed under the License is distributed on an "AS IS" basis, 
006WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
007specific language governing rights and limitations under the License. 
008
009The Original Code is "ConnectionHub.java".  Description: 
010"Provides access to shared HL7 Connections" 
011
012The Initial Developer of the Original Code is University Health Network. Copyright (C) 
0132001.  All Rights Reserved. 
014
015Contributor(s): ______________________________________. 
016
017Alternatively, the contents of this file may be used under the terms of the 
018GNU General Public License (the  �GPL�), in which case the provisions of the GPL are 
019applicable instead of those above.  If you wish to allow use of your version of this 
020file only under the terms of the GPL and not to allow others to use your version 
021of this file under the MPL, indicate your decision by deleting  the provisions above 
022and replace  them with the notice and other provisions required by the GPL License.  
023If you do not delete the provisions above, a recipient may use your version of 
024this file under either the MPL or the GPL. 
025 */
026
027package ca.uhn.hl7v2.app;
028
029import java.util.Collections;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.ExecutorService;
035
036import ca.uhn.hl7v2.DefaultHapiContext;
037import ca.uhn.hl7v2.HL7Exception;
038import ca.uhn.hl7v2.HapiContext;
039import ca.uhn.hl7v2.HapiContextSupport;
040import ca.uhn.hl7v2.concurrent.DefaultExecutorService;
041import ca.uhn.hl7v2.llp.LowerLayerProtocol;
042import ca.uhn.hl7v2.parser.Parser;
043import ca.uhn.hl7v2.util.ReflectionUtil;
044import ca.uhn.hl7v2.util.SocketFactory;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * <p>
050 * Provides access to shared HL7 Connections. The ConnectionHub has at most one connection to any
051 * given address at any time.
052 * </p>
053 * <p>
054 * <b>Synchronization Note:</b> This class should be safe to use in a multithreaded environment. A
055 * synchronization mutex is maintained for any given target host and port, so that if two threads
056 * are trying to connect to two separate destinations neither will block, but if two threads are
057 * trying to connect to the same destination, one will block until the other has finished trying.
058 * Use caution if this class is to be used in an environment where a very large (over 1000) number
059 * of target host/port destinations will be accessed at the same time.
060 * </p>
061 * 
062 * @author Bryan Tripp
063 */
064public class ConnectionHub extends HapiContextSupport {
065
066        private static volatile ConnectionHub instance = null;
067        private static final Logger log = LoggerFactory.getLogger(ConnectionHub.class);
068        /**
069         * Set a system property with this key to a string containing an integer larger than the default
070         * ("1000") if you need to connect to a very large number of targets at the same time in a
071         * multithreaded environment.
072         */
073        public static final String MAX_CONCURRENT_TARGETS = ConnectionHub.class.getName() + ".maxSize";
074        private final ConcurrentMap<String, String> connectionMutexes = new ConcurrentHashMap<String, String>();
075        private final CountingMap<ConnectionData, Connection> connections;
076
077        /** Creates a new instance of ConnectionHub */
078        private ConnectionHub(ExecutorService executorService) {
079                this(new DefaultHapiContext(executorService));
080        }
081
082        private ConnectionHub(HapiContext context) {
083                super(context);
084                connections = new CountingMap<ConnectionData, Connection>() {
085
086                        @Override
087                        protected void dispose(Connection connection) {
088                                connection.close();
089                        }
090
091                        @Override
092                        protected Connection open(ConnectionData connectionData) throws Exception {
093                                return ConnectionFactory
094                                                .open(connectionData, getHapiContext().getExecutorService());
095                        }
096
097                };
098        }
099
100        public Set<? extends ConnectionData> allConnections() {
101                return connections.keySet();
102        }
103
104        /**
105         * @since 2.0
106         */
107        public Connection attach(ConnectionData data) throws HL7Exception {
108                try {
109                        Connection conn = null;
110                        // Disallow establishing same connection targets concurrently
111                        connectionMutexes.putIfAbsent(data.toString(), data.toString());
112                        String mutex = connectionMutexes.get(data.toString());
113                        synchronized (mutex) {
114                                discardConnectionIfStale(connections.get(data));
115                                // Create connection or increase counter
116                                conn = connections.put(data);
117                        }
118                        return conn;
119                } catch (Exception e) {
120                        log.debug("Failed to attach", e);
121                        throw new HL7Exception("Cannot open connection to " + data.getHost() + ":"
122                                        + data.getPort() + "/" + data.getPort2(), e);
123                }
124        }
125
126    /**
127     * Returns a Connection to the given address, opening this Connection if necessary. The given
128     * Parser will only be used if a new Connection is opened, so there is no guarantee that the
129     * Connection returned will be using the Parser you provide. If you need explicit access to the
130     * Parser the Connection is using, call <code>Connection.getParser()</code>.
131     *
132     * @since 2.1
133     */
134    public Connection attach(String host, int port, boolean tls) throws HL7Exception {
135        return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
136                getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
137                .getSocketFactory(), false));
138    }
139
140        /**
141         * Returns a Connection to the given address, opening this Connection if necessary. The given
142         * Parser will only be used if a new Connection is opened, so there is no guarantee that the
143         * Connection returned will be using the Parser you provide. If you need explicit access to the
144         * Parser the Connection is using, call <code>Connection.getParser()</code>.
145         * 
146         * @since 2.2
147         */
148        public Connection attachLazily(String host, int port, boolean tls) throws HL7Exception {
149                return attach(new ConnectionData(host, port, 0, getHapiContext().getGenericParser(),
150                                getHapiContext().getLowerLayerProtocol(), tls, getHapiContext()
151                                                .getSocketFactory(), true));
152        }
153
154    /**
155     * @since 2.0
156     */
157    public Connection attach(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
158        return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
159                .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
160                getHapiContext().getSocketFactory(), false));
161    }
162
163        /**
164         * @since 2.2
165         */
166        public Connection attachLazily(String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
167                return attach(new ConnectionData(host, outboundPort, inboundPort, getHapiContext()
168                                .getGenericParser(), getHapiContext().getLowerLayerProtocol(), tls,
169                                getHapiContext().getSocketFactory(), true));
170        }
171
172        /**
173         * @since 2.0
174         */
175        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
176                        Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
177                return attach(host, outboundPort, inboundPort, parser, llpClass, false);
178        }
179
180    /**
181     * @since 2.0
182     */
183    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
184                             Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
185        return attachLazily(host, outboundPort, inboundPort, parser, llpClass, false);
186    }
187
188        /**
189         * @since 2.0
190         */
191        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
192                Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
193                LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
194                return attach(host, outboundPort, inboundPort, parser, llp, tls);
195        }
196
197    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
198                             Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
199        LowerLayerProtocol llp = ReflectionUtil.instantiate(llpClass);
200        return attachLazily(host, outboundPort, inboundPort, parser, llp, tls);
201    }
202
203        /**
204         * @since 2.0
205         */
206        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser,
207                        LowerLayerProtocol llp, boolean tls) throws HL7Exception {
208                return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, false));
209        }
210
211    /**
212     * @since 2.2
213     */
214    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser,
215                             LowerLayerProtocol llp, boolean tls) throws HL7Exception {
216        return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, null, true));
217    }
218
219        /**
220         * @since 2.1
221         */
222        public Connection attach(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
223                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
224                return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, false));
225        }
226
227    /**
228     * @since 2.1
229     */
230    public Connection attachLazily(String host, int outboundPort, int inboundPort, Parser parser, LowerLayerProtocol llp,
231                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
232        return attach(new ConnectionData(host, outboundPort, inboundPort, parser, llp, tls, socketFactory, true));
233    }
234
235        /**
236         * @since 2.1
237         */
238        public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
239                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
240                return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, false));
241        }
242
243    /**
244     * @since 2.1
245     */
246    public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
247                             boolean tls, SocketFactory socketFactory) throws HL7Exception {
248        return attach(new ConnectionData(host, port, 0, parser, llp, tls, socketFactory, true));
249    }
250
251        /**
252         * @since 2.1
253         */
254        public Connection attach(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
255                return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
256                tls, hapiContext.getSocketFactory(), false));
257        }
258
259    /**
260     * @since 2.2
261     */
262    public Connection attachLazily(DefaultHapiContext hapiContext, String host, int port, boolean tls) throws HL7Exception {
263        return attach(new ConnectionData(host, port, 0, hapiContext.getGenericParser(), hapiContext.getLowerLayerProtocol(),
264                tls, hapiContext.getSocketFactory(), true));
265    }
266
267        /**
268         * @since 2.1
269         */
270        public Connection attach(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
271                return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
272                hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), false));
273        }
274
275    /**
276     * @since 2.2
277     */
278    public Connection attachLazily(DefaultHapiContext hapiContext, String host, int outboundPort, int inboundPort, boolean tls) throws HL7Exception {
279        return attach(new ConnectionData(host, outboundPort, inboundPort, hapiContext.getGenericParser(),
280                hapiContext.getLowerLayerProtocol(), tls, hapiContext.getSocketFactory(), true));
281    }
282
283        /**
284         * @since 1.2
285         */
286        public Connection attach(String host, int port, Parser parser,
287                        Class<? extends LowerLayerProtocol> llpClass) throws HL7Exception {
288                return attach(host, port, parser, llpClass, false);
289        }
290
291        /**
292         * @since 2.0
293         */
294        public Connection attach(String host, int port, Parser parser,
295                        Class<? extends LowerLayerProtocol> llpClass, boolean tls) throws HL7Exception {
296                return attach(host, port, 0, parser, llpClass, tls);
297        }
298
299        /**
300         * @since 2.0
301         */
302        public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp)
303                        throws HL7Exception {
304                return attach(host, port, 0, parser, llp, false);
305        }
306
307    /**
308     * @since 2.2
309     */
310    public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp)
311            throws HL7Exception {
312        return attachLazily(host, port, 0, parser, llp, false);
313    }
314
315
316        /**
317         * @since 2.0
318         */
319        public Connection attach(String host, int port, Parser parser, LowerLayerProtocol llp,
320                        boolean tls) throws HL7Exception {
321                return attach(host, port, 0, parser, llp, tls);
322        }
323
324    /**
325     * @since 2.0
326     */
327    public Connection attachLazily(String host, int port, Parser parser, LowerLayerProtocol llp,
328                             boolean tls) throws HL7Exception {
329        return attachLazily(host, port, 0, parser, llp, tls);
330    }
331
332        /**
333         * Informs the ConnectionHub that you are done with the given Connection - if no other code is
334         * using it, it will be closed, so you should not attempt to use a Connection after detaching
335         * from it. If the connection is not enlisted, this method does nothing.
336         */
337        public void detach(Connection c) {
338                ConnectionData cd = connections.find(c);
339                if (cd != null)
340                        connections.remove(cd);
341        }
342
343        /**
344         * Closes and discards the given Connection so that it can not be returned in subsequent calls
345         * to attach(). This method is to be used when there is a problem with a Connection, e.g. socket
346         * connection closed by remote host.
347         */
348        public void discard(Connection c) {
349                ConnectionData cd = connections.find(c);
350                if (cd != null)
351                        connections.removeAllOf(cd);
352        }
353
354    /**
355     * Closes and discards all connections.
356     */
357        public void discardAll() {
358                for (ConnectionData cd : allConnections()) {
359                        connections.removeAllOf(cd);
360                }
361        }
362
363        private void discardConnectionIfStale(Connection conn) {
364                if (conn != null && !conn.isOpen()) {
365                        log.info("Discarding connection which appears to be closed. Remote addr: {}",
366                                        conn.getRemoteAddress());
367                        discard(conn);
368                        conn = null;
369                }
370        }
371
372        public Connection getKnownConnection(ConnectionData key) {
373                return connections.get(key);
374        }
375
376        public boolean isOpen(ConnectionData key) {
377                return getKnownConnection(key).isOpen();
378        }
379
380        /**
381         * Returns the singleton instance of ConnectionHub
382         * 
383         * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
384         *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
385         */
386        public static ConnectionHub getInstance() {
387                return getInstance(DefaultExecutorService.getDefaultService());
388        }
389
390        /**
391         * Returns the singleton instance of ConnectionHub.
392         * 
393         * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
394         *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
395         */
396        public synchronized static ConnectionHub getInstance(ExecutorService service) {
397                if (instance == null || service.isShutdown()) {
398                        instance = new ConnectionHub(service);
399                }
400                return instance;
401        }
402
403        /**
404         * Returns the singleton instance of ConnectionHub.
405         * 
406         * @deprecated Use {@link HapiContext#getConnectionHub()} to get an instance of ConnectionHub.
407         *             See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a> for an example of how to use ConnectionHub.
408         */
409        public static ConnectionHub getInstance(HapiContext context) {
410                if (instance == null || context.getExecutorService().isShutdown()) {
411                        instance = new ConnectionHub(context);
412                }
413                return instance;
414        }
415
416        /**
417         * <p>
418         * Returns a new (non-singleton) instance of the ConnectionHub which uses the given executor
419         * service.
420         * </p>
421         * <p>
422         * See <a href="http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/SendAndReceiveAMessage.html">this example page</a>
423         * for an example of how to use ConnectionHub.
424         * </p>
425         */
426        public synchronized static ConnectionHub getNewInstance(HapiContext context) {
427                return new ConnectionHub(context);
428        }
429
430        /**
431         * @deprecated default executor service is shut down automatically
432         */
433        public static void shutdown() {
434                ConnectionHub hub = getInstance();
435                if (DefaultExecutorService.isDefaultService(hub.getHapiContext().getExecutorService())) {
436                        hub.getHapiContext().getExecutorService().shutdown();
437                        instance = null;
438                }
439        }
440
441        /**
442         * Helper class that implements a map that increases/decreases a counter when an entry is
443         * added/removed. It is furthermore intended that an entry's value is derived from its key.
444         * 
445         * @param <K> key class
446         * @param <D> managed value class
447         */
448        private abstract class CountingMap<K, D> {
449                private Map<K, Count> content;
450
451                public CountingMap() {
452                        super();
453                        content = new ConcurrentHashMap<K, Count>();
454                }
455
456                protected abstract void dispose(D value);
457
458                public K find(D value) {
459                        for (Map.Entry<K, Count> entry : content.entrySet()) {
460                                if (entry.getValue().getValue().equals(value)) {
461                                        return entry.getKey();
462                                }
463                        }
464                        return null;
465                }
466
467                public D get(K key) {
468                        return content.containsKey(key) ? content.get(key).getValue() : null;
469                }
470
471                public Set<K> keySet() {
472                        return Collections.unmodifiableSet(content.keySet());
473                }
474
475                protected abstract D open(K key) throws Exception;
476
477                /**
478                 * If the key exists, the counter is increased. Otherwise, a value is created, and the
479                 * key/value pair is added to the map.
480                 */
481                public D put(K key) throws Exception {
482                        if (content.containsKey(key)) {
483                                return content.put(key, content.get(key).increase()).getValue();
484                        } else {
485                                Count c = new Count(open(key));
486                                content.put(key, c);
487                                return c.getValue();
488                        }
489                }
490
491                /**
492                 * If the counter of the key/value is greater than one, the counter is decreased. Otherwise,
493                 * the entry is removed and the value is cleaned up.
494                 */
495                public D remove(K key) {
496                        Count pair = content.get(key);
497                        if (pair == null)
498                                return null;
499                        if (pair.isLast()) {
500                                return removeAllOf(key);
501                        }
502                        return content.put(key, content.get(key).decrease()).getValue();
503                }
504
505                /**
506                 * The key/value entry is removed and the value is cleaned up.
507                 */
508                public D removeAllOf(K key) {
509                        D removed = content.remove(key).value;
510                        dispose(removed);
511                        return removed;
512                }
513
514                private class Count {
515                        private int count;
516                        private D value;
517
518                        public Count(D value) {
519                                this(value, 1);
520                        }
521
522                        private Count(D value, int number) {
523                                this.value = value;
524                                this.count = number;
525                        }
526
527                        Count decrease() {
528                                return !isLast() ? new Count(value, count - 1) : null;
529                        }
530
531                        public D getValue() {
532                                return value;
533                        }
534
535                        Count increase() {
536                                return new Count(value, count + 1);
537                        }
538
539                        boolean isLast() {
540                                return count == 1;
541                        }
542
543                }
544
545        }
546
547}