1 package ca.uhn.hl7v2.concurrent;
2
3 import java.util.Collection;
4 import java.util.Map;
5 import java.util.Set;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ConcurrentMap;
9 import java.util.concurrent.CountDownLatch;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Future;
13 import java.util.concurrent.TimeUnit;
14
15
16
17
18
19
20
21
22
23
24
25
26 public class BlockingHashMap<K, V> implements BlockingMap<K, V> {
27
28 private final ConcurrentMap<K, V> map = new ConcurrentHashMap<>();
29 private final ConcurrentMap<K, CountDownLatch> latches = new ConcurrentHashMap<>();
30 private final ExecutorService executor;
31
32 public BlockingHashMap() {
33 this(Executors.newCachedThreadPool());
34 }
35
36 public BlockingHashMap(ExecutorService executor) {
37 super();
38 this.executor = executor;
39 }
40
41
42
43
44
45
46 public Set<K> keySet() {
47 return map.keySet();
48 }
49
50
51
52
53
54
55 public V get(Object key) {
56 return map.get(key);
57 }
58
59
60
61
62
63
64 public boolean containsKey(Object key) {
65 return map.containsKey(key);
66 }
67
68
69
70
71 synchronized public V put(K key, V value) {
72 V result = map.put(key, value);
73 latchFor(key).countDown();
74 return result;
75 }
76
77
78
79
80
81 synchronized public boolean give(K key, V value) {
82 if (!latches.containsKey(key)) {
83 return false;
84 }
85 put(key, value);
86 return true;
87 }
88
89 public V take(K key) throws InterruptedException {
90 latchFor(key).await();
91 latches.remove(key);
92 return map.remove(key);
93 }
94
95
96 public Future<V> asyncTake(final K key) {
97 latchFor(key);
98 return executor.submit(() -> take(key));
99 }
100
101 public V poll(K key, long timeout, TimeUnit unit)
102 throws InterruptedException {
103 if (latchFor(key).await(timeout, unit)) {
104 latches.remove(key);
105 return map.remove(key);
106 }
107 return null;
108 }
109
110 public Future<V> asyncPoll(final K key, final long timeout, final TimeUnit unit) {
111 latchFor(key);
112 return executor.submit(() -> poll(key, timeout, unit));
113 }
114
115
116
117
118
119
120
121 public boolean isEmpty() {
122 return map.isEmpty();
123 }
124
125
126
127
128
129
130 public int size() {
131 return map.size();
132 }
133
134
135
136
137
138
139
140 synchronized public V remove(Object key) {
141 V result = map.remove(key);
142 CountDownLatch latch = latches.remove(key);
143 if (latch != null)
144 latch.countDown();
145 return result;
146 }
147
148
149
150
151
152
153
154 public void clear() {
155 for (K key : latches.keySet()) {
156 remove(key);
157 }
158 }
159
160 public Collection<V> values() {
161 return map.values();
162 }
163
164 public Set<java.util.Map.Entry<K, V>> entrySet() {
165 return map.entrySet();
166 }
167
168 public void putAll(Map<? extends K, ? extends V> t) {
169 for (Entry<? extends K, ? extends V> entry : t.entrySet()) {
170 put(entry.getKey(), entry.getValue());
171 }
172 }
173
174 public boolean containsValue(Object value) {
175 return map.containsValue(value);
176 }
177
178 private synchronized CountDownLatch latchFor(K key) {
179 CountDownLatch latch = latches.get(key);
180 if (latch == null) {
181 latch = new CountDownLatch(1);
182 latches.put(key, latch);
183 }
184 return latch;
185 }
186
187 }