1 package ca.uhn.hl7v2.hoh.raw.client;
2
3 import java.io.IOException;
4 import java.net.Socket;
5 import java.net.URL;
6 import java.text.SimpleDateFormat;
7 import java.util.ArrayList;
8 import java.util.Date;
9 import java.util.IdentityHashMap;
10 import java.util.Iterator;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Map.Entry;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.ScheduledExecutorService;
16 import java.util.concurrent.TimeUnit;
17
18 import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
19 import ca.uhn.hl7v2.hoh.util.Validate;
20
21
22
23
24
25
26
27
28
29
30
31
32 public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
33
34
35
36
37 public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
38
39 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
40
41 private final ScheduledExecutorService myExecutorService;
42 private final Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<>();
43 private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
44 private boolean myReapingScheduled;
45 private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
46
47
48
49
50 public HohRawClientMultithreaded() {
51 myExecutorService = Executors.newScheduledThreadPool(1);
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65 public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
66 this();
67
68 setHost(theHost);
69 setPort(thePort);
70 setUriPath(thePath);
71 }
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
87 super(theHost, thePort, theUriPath);
88 Validate.notNull(theExecutorService, "executorService");
89
90 myExecutorService = theExecutorService;
91 }
92
93
94
95
96
97
98
99 public HohRawClientMultithreaded(URL theUrl) {
100 this();
101 setUrl(theUrl);
102 }
103
104
105
106
107
108
109
110
111
112 public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
113 super(theUrl);
114 Validate.notNull(theExecutorService, "executorService");
115
116 myExecutorService = theExecutorService;
117 }
118
119 @Override
120 protected synchronized Socket provideSocket() throws IOException {
121 Socket retVal;
122 if (myIdleSocketsToTimeBecameIdle.size() == 0) {
123 ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
124 retVal = connect();
125 } else {
126 retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
127 myIdleSocketsToTimeBecameIdle.remove(retVal);
128 if (retVal.isClosed()) {
129 ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort());
130 retVal = connect();
131 } else {
132 ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort());
133 }
134 }
135 return retVal;
136 }
137
138
139
140
141
142 @Override
143 protected synchronized void returnSocket(Socket theSocket) {
144 if (theSocket.isClosed()) {
145 return;
146 }
147
148 long now = System.currentTimeMillis();
149
150
151
152 if (ourLog.isDebugEnabled()) {
153 if (mySocketTimeout == -1) {
154 ourLog.debug("Returning socket, will not attempt to reap");
155 } else {
156 ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
157 }
158 }
159
160 myIdleSocketsToTimeBecameIdle.put(theSocket, now);
161 scheduleReaping();
162 }
163
164 private void scheduleReaping() {
165 long now = System.currentTimeMillis();
166 if (myReapingScheduled) {
167 ourLog.debug("Reaping already scheduled");
168 return;
169 }
170
171 if (myIdleSocketsToTimeBecameIdle.size() < 1) {
172 return;
173 }
174
175 if (mySocketTimeout == -1) {
176 return;
177 }
178
179 long earliestReapingTime = Long.MAX_VALUE;
180 for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
181 long nextReapingTime = next + mySocketTimeout;
182 if (nextReapingTime < earliestReapingTime) {
183 earliestReapingTime = nextReapingTime;
184 }
185 }
186
187 long delay = earliestReapingTime - now;
188 if (ourLog.isDebugEnabled()) {
189 ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
190 }
191
192 myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
193 myReapingScheduled = true;
194 }
195
196
197
198
199 public long getSocketTimeout() {
200 return mySocketTimeout;
201 }
202
203
204
205
206 public synchronized void setSocketTimeout(long theSocketTimeout) {
207 if (mySocketTimeout < -1) {
208 throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
209 }
210 mySocketTimeout = theSocketTimeout;
211 myReapingScheduled = false;
212 scheduleReaping();
213 }
214
215 private class TimeoutTask implements Runnable {
216 public void run() {
217
218 if (mySocketTimeout == -1) {
219 return;
220 }
221
222 ourLog.debug("Beginning socket reaping pass");
223 try {
224
225 List<Socket> socketsToClose = new ArrayList<>();
226 long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
227 synchronized (HohRawClientMultithreaded.this) {
228
229 for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
230 Entry<Socket, Long> nextEntry = iter.next();
231 if (nextEntry.getValue() <= closeIfActiveBefore) {
232 Socket key = nextEntry.getKey();
233 socketsToClose.add(key);
234 ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
235 iter.remove();
236 } else {
237 if (ourLog.isDebugEnabled()) {
238 ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
239 }
240 }
241 }
242
243 myReapingScheduled = false;
244 scheduleReaping();
245 }
246
247 for (Socket next : socketsToClose) {
248 closeSocket(next);
249 }
250 } catch (Throwable e) {
251 ourLog.error("Failure during reaper pass", e);
252 }
253 }
254 }
255
256 }