Coverage Report - ca.uhn.hl7v2.concurrent.BlockingHashMap
 
Classes in this File Line Coverage Branch Coverage Complexity
BlockingHashMap
82%
42/51
75%
9/12
1.364
BlockingHashMap$1
100%
2/2
N/A
1.364
BlockingHashMap$2
100%
2/2
N/A
1.364
 
 1  
 package ca.uhn.hl7v2.concurrent;
 2  
 
 3  
 import java.util.Collection;
 4  
 import java.util.Map;
 5  
 import java.util.Set;
 6  
 import java.util.concurrent.Callable;
 7  
 import java.util.concurrent.ConcurrentHashMap;
 8  
 import java.util.concurrent.ConcurrentMap;
 9  
 import java.util.concurrent.CountDownLatch;
 10  
 import java.util.concurrent.ExecutorService;
 11  
 import java.util.concurrent.Executors;
 12  
 import java.util.concurrent.Future;
 13  
 import java.util.concurrent.TimeUnit;
 14  
 
 15  
 /**
 16  
  * Default Implementation of a {@link BlockingMap}.
 17  
  * <p>
 18  
  * Note: While it is not actively prevented that more then one thread waits for
 19  
  * an entry, it is not guaranteed that all waiting threads will receive the
 20  
  * entry once it became available. Other implementations may choose to count the
 21  
  * waiting threads and/or to remove an available value after a grace period.
 22  
  * 
 23  
  * @param <K>
 24  
  * @param <V>
 25  
  */
 26  
 public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
 27  
 
 28  489
         private final ConcurrentMap<K, V> map = new ConcurrentHashMap<K, V>();
 29  489
         private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<K, CountDownLatch>();
 30  
         private final ExecutorService executor;
 31  
         
 32  
         public BlockingHashMap() {
 33  0
                 this(Executors.newCachedThreadPool());
 34  0
         }
 35  
         
 36  
         public BlockingHashMap(ExecutorService executor) {
 37  489
                 super();
 38  489
                 this.executor = executor;
 39  489
         }
 40  
 
 41  
         /**
 42  
          * Returns the keys of available entries
 43  
          * 
 44  
          * @see java.util.Map#keySet()
 45  
          */
 46  
         public Set<K> keySet() {
 47  0
                 return map.keySet();
 48  
         }
 49  
 
 50  
         /**
 51  
          * Returns an available entry without removing it from the map
 52  
          * 
 53  
          * @see java.util.Map#get(java.lang.Object)
 54  
          */
 55  
         public V get(Object key) {
 56  10
                 return map.get(key);
 57  
         }
 58  
 
 59  
         /**
 60  
          * Returns <code>true</code> if an entry with the given key is available
 61  
          * 
 62  
          * @see java.util.Map#containsKey(java.lang.Object)
 63  
          */
 64  
         public boolean containsKey(Object key) {
 65  70
                 return map.containsKey(key);
 66  
         }
 67  
 
 68  
         /**
 69  
          * @see java.util.Map#put(java.lang.Object, java.lang.Object)
 70  
          */
 71  
         synchronized public V put(K key, V value) {
 72  5608
                 V result = map.put(key, value);
 73  5608
                 latchFor(key).countDown();
 74  5608
                 return result;
 75  
         }
 76  
 
 77  
         /**
 78  
          * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object,
 79  
          *      java.lang.Object)
 80  
          */
 81  
         synchronized public boolean give(K key, V value) {
 82  5588
                 if (!latches.containsKey(key)) {
 83  5
                         return false;
 84  
                 }
 85  5583
                 put(key, value);
 86  5583
                 return true;
 87  
         }
 88  
 
 89  
         public V take(K key) throws InterruptedException {
 90  25
                 latchFor(key).await();
 91  25
                 latches.remove(key);
 92  25
                 return map.remove(key); // likely to fail there are n > 1 consumers
 93  
         }
 94  
         
 95  
 
 96  
         public Future<V> asyncTake(final K key) throws InterruptedException {
 97  25
                 latchFor(key);
 98  25
                 return executor.submit(new Callable<V>() {
 99  
 
 100  
                         public V call() throws Exception {
 101  25
                                 return take(key);
 102  
                         }
 103  
                 });
 104  
         }
 105  
 
 106  
         public V poll(K key, long timeout, TimeUnit unit)
 107  
                         throws InterruptedException {
 108  5603
                 if (latchFor(key).await(timeout, unit)) {
 109  5588
                         latches.remove(key);
 110  5588
                         return map.remove(key);
 111  
                 }
 112  15
                 return null;
 113  
         }
 114  
         
 115  
         public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
 116  5603
                 latchFor(key);
 117  5603
                 return executor.submit(new Callable<V>() {
 118  
 
 119  
                         public V call() throws Exception {
 120  5603
                                 return poll(key, timeout, unit);
 121  
                         }
 122  
                 });                
 123  
         }
 124  
         
 125  
 
 126  
         /**
 127  
          * Returns true if no entry is available for consumers
 128  
          * 
 129  
          * @see java.util.Map#isEmpty()
 130  
          */
 131  
         public boolean isEmpty() {
 132  25
                 return map.isEmpty();
 133  
         }
 134  
 
 135  
         /**
 136  
          * Returns the number of available values
 137  
          * 
 138  
          * @see java.util.Map#size()
 139  
          */
 140  
         public int size() {
 141  5
                 return map.size();
 142  
         }
 143  
 
 144  
         /**
 145  
          * Removes an entry, regardless whether a value has been set or not. Waiting
 146  
          * consumers will receive a null value.
 147  
          * 
 148  
          * @see java.util.Map#remove(java.lang.Object)
 149  
          */
 150  
         synchronized public V remove(Object key) {
 151  10
                 V result = map.remove(key);
 152  10
                 CountDownLatch latch = latches.remove(key);
 153  10
                 if (latch != null)
 154  10
                         latch.countDown();
 155  10
                 return result;
 156  
         }
 157  
 
 158  
         /**
 159  
          * Clears all existing entries. Waiting consumers will receive a null value
 160  
          * for each removed entry.
 161  
          * 
 162  
          * @see java.util.Map#clear()
 163  
          */
 164  
         public void clear() {
 165  5
                 for (K key : latches.keySet()) {
 166  5
                         remove(key);
 167  5
                 }
 168  5
         }
 169  
 
 170  
         public Collection<V> values() {
 171  0
                 return map.values();
 172  
         }
 173  
 
 174  
         public Set<java.util.Map.Entry<K, V>> entrySet() {
 175  0
                 return map.entrySet();
 176  
         }
 177  
 
 178  
         public void putAll(Map<? extends K, ? extends V> t) {
 179  0
                 for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
 180  0
                         put(entry.getKey(), entry.getValue());
 181  0
                 }
 182  0
         }
 183  
 
 184  
         public boolean containsValue(Object value) {
 185  5
                 return map.containsValue(value);
 186  
         }
 187  
 
 188  
         private synchronized CountDownLatch latchFor(K key) {
 189  16864
                 CountDownLatch latch = latches.get(key);
 190  16864
                 if (latch == null) {
 191  5628
                         latch = new CountDownLatch(1);
 192  5628
                         latches.put(key, latch);
 193  
                 }
 194  16864
                 return latch;
 195  
         }
 196  
 
 197  
 }