View Javadoc
1   package ca.uhn.hl7v2.concurrent;
2   
3   import java.util.Collection;
4   import java.util.Map;
5   import java.util.Set;
6   import java.util.concurrent.Callable;
7   import java.util.concurrent.ConcurrentHashMap;
8   import java.util.concurrent.ConcurrentMap;
9   import java.util.concurrent.CountDownLatch;
10  import java.util.concurrent.ExecutorService;
11  import java.util.concurrent.Executors;
12  import java.util.concurrent.Future;
13  import java.util.concurrent.TimeUnit;
14  
15  /**
16   * Default Implementation of a {@link BlockingMap}.
17   * <p>
18   * Note: While it is not actively prevented that more then one thread waits for
19   * an entry, it is not guaranteed that all waiting threads will receive the
20   * entry once it became available. Other implementations may choose to count the
21   * waiting threads and/or to remove an available value after a grace period.
22   * 
23   * @param <K>
24   * @param <V>
25   */
26  public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
27  
28  	private final ConcurrentMap<K, V> map = new ConcurrentHashMap<>();
29  	private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<>();
30  	private final ExecutorService executor;
31  	
32  	public BlockingHashMap() {
33  		this(Executors.newCachedThreadPool());
34  	}
35  	
36  	public BlockingHashMap(ExecutorService executor) {
37  		super();
38  		this.executor = executor;
39  	}
40  
41  	/**
42  	 * Returns the keys of available entries
43  	 * 
44  	 * @see java.util.Map#keySet()
45  	 */
46  	public Set<K> keySet() {
47  		return map.keySet();
48  	}
49  
50  	/**
51  	 * Returns an available entry without removing it from the map
52  	 * 
53  	 * @see java.util.Map#get(java.lang.Object)
54  	 */
55  	public V get(Object key) {
56  		return map.get(key);
57  	}
58  
59  	/**
60  	 * Returns <code>true</code> if an entry with the given key is available
61  	 * 
62  	 * @see java.util.Map#containsKey(java.lang.Object)
63  	 */
64  	public boolean containsKey(Object key) {
65  		return map.containsKey(key);
66  	}
67  
68  	/**
69  	 * @see java.util.Map#put(java.lang.Object, java.lang.Object)
70  	 */
71  	synchronized public V put(K key, V value) {
72  		V result = map.put(key, value);
73  		latchFor(key).countDown();
74  		return result;
75  	}
76  
77  	/**
78  	 * @see ca.uhn.hl7v2.concurrent.BlockingMap#give(java.lang.Object,
79  	 *      java.lang.Object)
80  	 */
81  	synchronized public boolean give(K key, V value) {
82  		if (!latches.containsKey(key)) {
83  			return false;
84  		}
85  		put(key, value);
86  		return true;
87  	}
88  
89  	public V take(K key) throws InterruptedException {
90  		latchFor(key).await();
91  		latches.remove(key);
92  		return map.remove(key); // likely to fail there are n > 1 consumers
93  	}
94  	
95  
96  	public Future<V> asyncTake(final K key) {
97  		latchFor(key);
98  		return executor.submit(() -> take(key));
99  	}
100 
101 	public V poll(K key, long timeout, TimeUnit unit)
102 			throws InterruptedException {
103 		if (latchFor(key).await(timeout, unit)) {
104 			latches.remove(key);
105 			return map.remove(key);
106 		}
107 		return null;
108 	}
109 	
110 	public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
111 		latchFor(key);
112 		return executor.submit(() -> poll(key, timeout, unit));
113 	}
114 	
115 
116 	/**
117 	 * Returns true if no entry is available for consumers
118 	 * 
119 	 * @see java.util.Map#isEmpty()
120 	 */
121 	public boolean isEmpty() {
122 		return map.isEmpty();
123 	}
124 
125 	/**
126 	 * Returns the number of available values
127 	 * 
128 	 * @see java.util.Map#size()
129 	 */
130 	public int size() {
131 		return map.size();
132 	}
133 
134 	/**
135 	 * Removes an entry, regardless whether a value has been set or not. Waiting
136 	 * consumers will receive a null value.
137 	 * 
138 	 * @see java.util.Map#remove(java.lang.Object)
139 	 */
140 	synchronized public V remove(Object key) {
141 		V result = map.remove(key);
142 		CountDownLatch latch = latches.remove(key);
143 		if (latch != null)
144 			latch.countDown();
145 		return result;
146 	}
147 
148 	/**
149 	 * Clears all existing entries. Waiting consumers will receive a null value
150 	 * for each removed entry.
151 	 * 
152 	 * @see java.util.Map#clear()
153 	 */
154 	public void clear() {
155 		for (K key : latches.keySet()) {
156 			remove(key);
157 		}
158 	}
159 
160 	public Collection<V> values() {
161 		return map.values();
162 	}
163 
164 	public Set<java.util.Map.Entry<K, V>> entrySet() {
165 		return map.entrySet();
166 	}
167 
168 	public void putAll(Map<? extends K, ? extends V> t) {
169 		for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
170 			put(entry.getKey(), entry.getValue());
171 		}
172 	}
173 
174 	public boolean containsValue(Object value) {
175 		return map.containsValue(value);
176 	}
177 
178 	private synchronized CountDownLatch latchFor(K key) {
179 		CountDownLatch latch = latches.get(key);
180 		if (latch == null) {
181 			latch = new CountDownLatch(1);
182 			latches.put(key, latch);
183 		}
184 		return latch;
185 	}
186 
187 }