1 package ca.uhn.hl7v2.hoh.raw.client;
2
3 import java.io.IOException;
4 import java.net.Socket;
5 import java.net.URL;
6 import java.text.SimpleDateFormat;
7 import java.util.ArrayList;
8 import java.util.Date;
9 import java.util.IdentityHashMap;
10 import java.util.Iterator;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Map.Entry;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.ScheduledExecutorService;
16 import java.util.concurrent.TimeUnit;
17
18 import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
19 import ca.uhn.hl7v2.hoh.util.Validate;
20
21
22
23
24
25
26
27
28
29
30
31
32 public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
33
34
35
36
37 public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
38
39 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
40
41 private final ScheduledExecutorService myExecutorService;
42 private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>();
43 private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
44 private boolean myReapingScheduled;
45 private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
46
47
48
49
50 public HohRawClientMultithreaded() {
51 myExecutorService = Executors.newScheduledThreadPool(1);
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65 public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
66 this();
67
68 setHost(theHost);
69 setPort(thePort);
70 setUriPath(thePath);
71 }
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
87 super(theHost, thePort, theUriPath);
88 Validate.notNull(theExecutorService, "executorService");
89
90 myExecutorService = theExecutorService;
91 }
92
93
94
95
96
97
98
99
100
101 public HohRawClientMultithreaded(URL theUrl) {
102 this();
103 setUrl(theUrl);
104 }
105
106
107
108
109
110
111
112
113
114 public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
115 super(theUrl);
116 Validate.notNull(theExecutorService, "executorService");
117
118 myExecutorService = theExecutorService;
119 }
120
121 @Override
122 protected synchronized Socket provideSocket() throws IOException {
123 Socket retVal;
124 if (myIdleSocketsToTimeBecameIdle.size() == 0) {
125 ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
126 retVal = connect();
127 } else {
128 retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
129 myIdleSocketsToTimeBecameIdle.remove(retVal);
130 if (retVal.isClosed()) {
131 ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort());
132 retVal = connect();
133 } else {
134 ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort());
135 }
136 }
137 return retVal;
138 }
139
140
141
142
143
144 @Override
145 protected synchronized void returnSocket(Socket theSocket) {
146 if (theSocket.isClosed()) {
147 return;
148 }
149
150 long now = System.currentTimeMillis();
151
152
153
154 if (ourLog.isDebugEnabled()) {
155 if (mySocketTimeout == -1) {
156 ourLog.debug("Returning socket, will not attempt to reap");
157 } else {
158 ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
159 }
160 }
161
162 myIdleSocketsToTimeBecameIdle.put(theSocket, now);
163 scheduleReaping();
164 }
165
166 private void scheduleReaping() {
167 long now = System.currentTimeMillis();
168 if (myReapingScheduled) {
169 ourLog.debug("Reaping already scheduled");
170 return;
171 }
172
173 if (myIdleSocketsToTimeBecameIdle.size() < 1) {
174 return;
175 }
176
177 if (mySocketTimeout == -1) {
178 return;
179 }
180
181 long earliestReapingTime = Long.MAX_VALUE;
182 for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
183 long nextReapingTime = next + mySocketTimeout;
184 if (nextReapingTime < earliestReapingTime) {
185 earliestReapingTime = nextReapingTime;
186 }
187 }
188
189 long delay = earliestReapingTime - now;
190 if (ourLog.isDebugEnabled()) {
191 ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
192 }
193
194 myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
195 myReapingScheduled = true;
196 }
197
198
199
200
201 public long getSocketTimeout() {
202 return mySocketTimeout;
203 }
204
205
206
207
208 public synchronized void setSocketTimeout(long theSocketTimeout) {
209 if (mySocketTimeout < -1) {
210 throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
211 }
212 mySocketTimeout = theSocketTimeout;
213 myReapingScheduled = false;
214 scheduleReaping();
215 }
216
217 private class TimeoutTask implements Runnable {
218 public void run() {
219
220 if (mySocketTimeout == -1) {
221 return;
222 }
223
224 ourLog.debug("Beginning socket reaping pass");
225 try {
226
227 List<Socket> socketsToClose = new ArrayList<Socket>();
228 long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
229 synchronized (HohRawClientMultithreaded.this) {
230
231 for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
232 Entry<Socket, Long> nextEntry = iter.next();
233 if (nextEntry.getValue() <= closeIfActiveBefore) {
234 Socket key = nextEntry.getKey();
235 socketsToClose.add(key);
236 ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
237 iter.remove();
238 } else {
239 if (ourLog.isDebugEnabled()) {
240 ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
241 }
242 }
243 }
244
245 myReapingScheduled = false;
246 scheduleReaping();
247 }
248
249 for (Socket next : socketsToClose) {
250 closeSocket(next);
251 }
252 } catch (Throwable e) {
253 ourLog.error("Failure during reaper pass", e);
254 }
255 }
256 }
257
258 }