001package ca.uhn.hl7v2.concurrent;
002
003import java.util.Collection;
004import java.util.Map;
005import java.util.Set;
006import java.util.concurrent.Callable;
007import java.util.concurrent.ConcurrentHashMap;
008import java.util.concurrent.ConcurrentMap;
009import java.util.concurrent.CountDownLatch;
010import java.util.concurrent.ExecutorService;
011import java.util.concurrent.Executors;
012import java.util.concurrent.Future;
013import java.util.concurrent.TimeUnit;
014
015/**
016 * Default Implementation of a {@link BlockingMap}.
017 * <p>
018 * Note: While it is not actively prevented that more then one thread waits for
019 * an entry, it is not guaranteed that all waiting threads will receive the
020 * entry once it became available. Other implementations may choose to count the
021 * waiting threads and/or to remove an available value after a grace period.
022 * 
023 * @param <K>
024 * @param <V>
025 */
026public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
027
028        private final ConcurrentMap<K, V> map = new ConcurrentHashMap<K, V>();
029        private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<K, CountDownLatch>();
030        private final ExecutorService executor;
031        
032        public BlockingHashMap() {
033                this(Executors.newCachedThreadPool());
034        }
035        
036        public BlockingHashMap(ExecutorService executor) {
037                super();
038                this.executor = executor;
039        }
040
041        /**
042         * Returns the keys of available entries
043         * 
044         * @see java.util.Map#keySet()
045         */
046        public Set<K> keySet() {
047                return map.keySet();
048        }
049
050        /**
051         * Returns an available entry without removing it from the map
052         * 
053         * @see java.util.Map#get(java.lang.Object)
054         */
055        public V get(Object key) {
056                return map.get(key);
057        }
058
059        /**
060         * Returns <code>true</code> if an entry with the given key is available
061         * 
062         * @see java.util.Map#containsKey(java.lang.Object)
063         */
064        public boolean containsKey(Object key) {
065                return map.containsKey(key);
066        }
067
068        /**
069         * @see java.util.Map#put(java.lang.Object, java.lang.Object)
070         */
071        synchronized public V put(K key, V value) {
072                V result = map.put(key, value);
073                latchFor(key).countDown();
074                return result;
075        }
076
077        /**
078         * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object,
079         *      java.lang.Object)
080         */
081        synchronized public boolean give(K key, V value) {
082                if (!latches.containsKey(key)) {
083                        return false;
084                }
085                put(key, value);
086                return true;
087        }
088
089        public V take(K key) throws InterruptedException {
090                latchFor(key).await();
091                latches.remove(key);
092                return map.remove(key); // likely to fail there are n > 1 consumers
093        }
094        
095
096        public Future<V> asyncTake(final K key) throws InterruptedException {
097                latchFor(key);
098                return executor.submit(new Callable<V>() {
099
100                        public V call() throws Exception {
101                                return take(key);
102                        }
103                });
104        }
105
106        public V poll(K key, long timeout, TimeUnit unit)
107                        throws InterruptedException {
108                if (latchFor(key).await(timeout, unit)) {
109                        latches.remove(key);
110                        return map.remove(key);
111                }
112                return null;
113        }
114        
115        public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
116                latchFor(key);
117                return executor.submit(new Callable<V>() {
118
119                        public V call() throws Exception {
120                                return poll(key, timeout, unit);
121                        }
122                });             
123        }
124        
125
126        /**
127         * Returns true if no entry is available for consumers
128         * 
129         * @see java.util.Map#isEmpty()
130         */
131        public boolean isEmpty() {
132                return map.isEmpty();
133        }
134
135        /**
136         * Returns the number of available values
137         * 
138         * @see java.util.Map#size()
139         */
140        public int size() {
141                return map.size();
142        }
143
144        /**
145         * Removes an entry, regardless whether a value has been set or not. Waiting
146         * consumers will receive a null value.
147         * 
148         * @see java.util.Map#remove(java.lang.Object)
149         */
150        synchronized public V remove(Object key) {
151                V result = map.remove(key);
152                CountDownLatch latch = latches.remove(key);
153                if (latch != null)
154                        latch.countDown();
155                return result;
156        }
157
158        /**
159         * Clears all existing entries. Waiting consumers will receive a null value
160         * for each removed entry.
161         * 
162         * @see java.util.Map#clear()
163         */
164        public void clear() {
165                for (K key : latches.keySet()) {
166                        remove(key);
167                }
168        }
169
170        public Collection<V> values() {
171                return map.values();
172        }
173
174        public Set<java.util.Map.Entry<K, V>> entrySet() {
175                return map.entrySet();
176        }
177
178        public void putAll(Map<? extends K, ? extends V> t) {
179                for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
180                        put(entry.getKey(), entry.getValue());
181                }
182        }
183
184        public boolean containsValue(Object value) {
185                return map.containsValue(value);
186        }
187
188        private synchronized CountDownLatch latchFor(K key) {
189                CountDownLatch latch = latches.get(key);
190                if (latch == null) {
191                        latch = new CountDownLatch(1);
192                        latches.put(key, latch);
193                }
194                return latch;
195        }
196
197}