001package ca.uhn.hl7v2.concurrent; 002 003import java.util.Collection; 004import java.util.Map; 005import java.util.Set; 006import java.util.concurrent.Callable; 007import java.util.concurrent.ConcurrentHashMap; 008import java.util.concurrent.ConcurrentMap; 009import java.util.concurrent.CountDownLatch; 010import java.util.concurrent.ExecutorService; 011import java.util.concurrent.Executors; 012import java.util.concurrent.Future; 013import java.util.concurrent.TimeUnit; 014 015/** 016 * Default Implementation of a {@link BlockingMap}. 017 * <p> 018 * Note: While it is not actively prevented that more then one thread waits for 019 * an entry, it is not guaranteed that all waiting threads will receive the 020 * entry once it became available. Other implementations may choose to count the 021 * waiting threads and/or to remove an available value after a grace period. 022 * 023 * @param <K> 024 * @param <V> 025 */ 026public class BlockingHashMap<K, V> implements BlockingMap<K, V> { 027 028 private final ConcurrentMap<K, V> map = new ConcurrentHashMap<K, V>(); 029 private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<K, CountDownLatch>(); 030 private final ExecutorService executor; 031 032 public BlockingHashMap() { 033 this(Executors.newCachedThreadPool()); 034 } 035 036 public BlockingHashMap(ExecutorService executor) { 037 super(); 038 this.executor = executor; 039 } 040 041 /** 042 * Returns the keys of available entries 043 * 044 * @see java.util.Map#keySet() 045 */ 046 public Set<K> keySet() { 047 return map.keySet(); 048 } 049 050 /** 051 * Returns an available entry without removing it from the map 052 * 053 * @see java.util.Map#get(java.lang.Object) 054 */ 055 public V get(Object key) { 056 return map.get(key); 057 } 058 059 /** 060 * Returns <code>true</code> if an entry with the given key is available 061 * 062 * @see java.util.Map#containsKey(java.lang.Object) 063 */ 064 public boolean containsKey(Object key) { 065 return map.containsKey(key); 066 } 067 068 /** 069 * @see java.util.Map#put(java.lang.Object, java.lang.Object) 070 */ 071 synchronized public V put(K key, V value) { 072 V result = map.put(key, value); 073 latchFor(key).countDown(); 074 return result; 075 } 076 077 /** 078 * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object, 079 * java.lang.Object) 080 */ 081 synchronized public boolean give(K key, V value) { 082 if (!latches.containsKey(key)) { 083 return false; 084 } 085 put(key, value); 086 return true; 087 } 088 089 public V take(K key) throws InterruptedException { 090 latchFor(key).await(); 091 latches.remove(key); 092 return map.remove(key); // likely to fail there are n > 1 consumers 093 } 094 095 096 public Future<V> asyncTake(final K key) throws InterruptedException { 097 latchFor(key); 098 return executor.submit(new Callable<V>() { 099 100 public V call() throws Exception { 101 return take(key); 102 } 103 }); 104 } 105 106 public V poll(K key, long timeout, TimeUnit unit) 107 throws InterruptedException { 108 if (latchFor(key).await(timeout, unit)) { 109 latches.remove(key); 110 return map.remove(key); 111 } 112 return null; 113 } 114 115 public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) { 116 latchFor(key); 117 return executor.submit(new Callable<V>() { 118 119 public V call() throws Exception { 120 return poll(key, timeout, unit); 121 } 122 }); 123 } 124 125 126 /** 127 * Returns true if no entry is available for consumers 128 * 129 * @see java.util.Map#isEmpty() 130 */ 131 public boolean isEmpty() { 132 return map.isEmpty(); 133 } 134 135 /** 136 * Returns the number of available values 137 * 138 * @see java.util.Map#size() 139 */ 140 public int size() { 141 return map.size(); 142 } 143 144 /** 145 * Removes an entry, regardless whether a value has been set or not. Waiting 146 * consumers will receive a null value. 147 * 148 * @see java.util.Map#remove(java.lang.Object) 149 */ 150 synchronized public V remove(Object key) { 151 V result = map.remove(key); 152 CountDownLatch latch = latches.remove(key); 153 if (latch != null) 154 latch.countDown(); 155 return result; 156 } 157 158 /** 159 * Clears all existing entries. Waiting consumers will receive a null value 160 * for each removed entry. 161 * 162 * @see java.util.Map#clear() 163 */ 164 public void clear() { 165 for (K key : latches.keySet()) { 166 remove(key); 167 } 168 } 169 170 public Collection<V> values() { 171 return map.values(); 172 } 173 174 public Set<java.util.Map.Entry<K, V>> entrySet() { 175 return map.entrySet(); 176 } 177 178 public void putAll(Map<? extends K, ? extends V> t) { 179 for (Entry<? extends K, ? extends V> entry : t.entrySet()) { 180 put(entry.getKey(), entry.getValue()); 181 } 182 } 183 184 public boolean containsValue(Object value) { 185 return map.containsValue(value); 186 } 187 188 private synchronized CountDownLatch latchFor(K key) { 189 CountDownLatch latch = latches.get(key); 190 if (latch == null) { 191 latch = new CountDownLatch(1); 192 latches.put(key, latch); 193 } 194 return latch; 195 } 196 197}