View Javadoc
1   package ca.uhn.hl7v2.hoh.raw.client;
2   
3   import java.io.IOException;
4   import java.net.Socket;
5   import java.net.URL;
6   import java.text.SimpleDateFormat;
7   import java.util.ArrayList;
8   import java.util.Date;
9   import java.util.IdentityHashMap;
10  import java.util.Iterator;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.Map.Entry;
14  import java.util.concurrent.Executors;
15  import java.util.concurrent.ScheduledExecutorService;
16  import java.util.concurrent.TimeUnit;
17  
18  import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
19  import ca.uhn.hl7v2.hoh.util.Validate;
20  
21  /**
22   * <p>
23   * Raw message sender using the HL7 over HTTP specification which uses a
24   * {@link ScheduledExecutorService} to provide advanced functionality such as
25   * persistent connections which time out and close automatically.
26   * </p>
27   * <p>
28   * This connector uses an executor service which can start worker threads, so
29   * use caution if embedding within a J2EE container.
30   * </p>
31   */
32  public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
33  
34  	/**
35  	 * Default {@link #setSocketTimeout(long) Socket Timeout}, 10000ms
36  	 */
37  	public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
38  
39  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
40  
41  	private final ScheduledExecutorService myExecutorService;
42  	private final Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<>();
43  	private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
44  	private boolean myReapingScheduled;
45  	private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
46  
47  	/**
48  	 * Constructor
49  	 */
50  	public HohRawClientMultithreaded() {
51  		myExecutorService = Executors.newScheduledThreadPool(1);
52  	}
53  
54  	/**
55  	 * Constructor
56  	 * 
57  	 * @param theHost
58  	 *            The HOST (name/address). E.g. "192.168.1.1"
59  	 * @param thePort
60  	 *            The PORT. E.g. "8080"
61  	 * @param thePath
62  	 *            The path being requested (must either be blank or start with
63  	 *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
64  	 */
65  	public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
66  		this();
67  
68  		setHost(theHost);
69  		setPort(thePort);
70  		setUriPath(thePath);
71  	}
72  
73  	/**
74  	 * Constructor
75  	 * 
76  	 * @param theHost
77  	 *            The HOST (name/address). E.g. "192.168.1.1"
78  	 * @param thePort
79  	 *            The PORT. E.g. "8080"
80  	 * @param theUriPath
81  	 *            The URI path being requested (must either be blank or start with
82  	 *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
83  	 * @param theExecutorService
84  	 *            The executor service to use for detecting stale sockets
85  	 */
86  	public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
87  		super(theHost, thePort, theUriPath);
88  		Validate.notNull(theExecutorService, "executorService");
89  
90  		myExecutorService = theExecutorService;
91  	}
92  
93  	/**
94  	 * Constructor
95  	 * 
96  	 * @param theUrl
97  	 *            The URL to connect to
98  	 */
99  	public HohRawClientMultithreaded(URL theUrl) {
100 		this();
101 		setUrl(theUrl);
102 	}
103 
104 	/**
105 	 * Constructor
106 	 * 
107 	 * @param theUrl
108 	 *            The URL to connect to
109 	 * @param theExecutorService
110 	 *            The executor service to use for detecting stale sockets
111 	 */
112 	public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
113 		super(theUrl);
114 		Validate.notNull(theExecutorService, "executorService");
115 
116 		myExecutorService = theExecutorService;
117 	}
118 
119 	@Override
120 	protected synchronized Socket provideSocket() throws IOException {
121 		Socket retVal;
122 		if (myIdleSocketsToTimeBecameIdle.size() == 0) {
123 			ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
124 			retVal = connect();
125 		} else {
126 			retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
127 			myIdleSocketsToTimeBecameIdle.remove(retVal);
128 			if (retVal.isClosed()) {
129 				ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort());
130 				retVal = connect();
131 			} else {
132 				ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort());
133 			}
134 		}
135 		return retVal;
136 	}
137 
138 	/**
139 	 * Returns a socket to the pool. If the socket is closed, it will
140 	 * not be returned.
141 	 */
142 	@Override
143 	protected synchronized void returnSocket(Socket theSocket) {
144 		if (theSocket.isClosed()) {
145 			return;
146 		}
147 		
148 		long now = System.currentTimeMillis();
149 
150 		// TODO: reap immediately if timeout is 0
151 		
152 		if (ourLog.isDebugEnabled()) {
153 			if (mySocketTimeout == -1) {
154 				ourLog.debug("Returning socket, will not attempt to reap");
155 			} else {
156 				ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
157 			}
158 		}
159 
160 		myIdleSocketsToTimeBecameIdle.put(theSocket, now);
161 		scheduleReaping();
162 	}
163 
164 	private void scheduleReaping() {
165 		long now = System.currentTimeMillis();
166 		if (myReapingScheduled) {
167 			ourLog.debug("Reaping already scheduled");
168 			return;
169 		}
170 
171 		if (myIdleSocketsToTimeBecameIdle.size() < 1) {
172 			return;
173 		}
174 
175 		if (mySocketTimeout == -1) {
176 			return;
177 		}
178 		
179 		long earliestReapingTime = Long.MAX_VALUE;
180 		for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
181 			long nextReapingTime = next + mySocketTimeout;
182 			if (nextReapingTime < earliestReapingTime) {
183 				earliestReapingTime = nextReapingTime;
184 			}
185 		}
186 
187 		long delay = earliestReapingTime - now;
188 		if (ourLog.isDebugEnabled()) {
189 			ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
190 		}
191 
192 		myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
193 		myReapingScheduled = true;
194 	}
195 
196 	/**
197 	 * {@inheritDoc}
198 	 */
199 	public long getSocketTimeout() {
200 		return mySocketTimeout;
201 	}
202 
203 	/**
204 	 * {@inheritDoc}
205 	 */
206 	public synchronized void setSocketTimeout(long theSocketTimeout) {
207 		if (mySocketTimeout < -1) {
208 			throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
209 		}
210 		mySocketTimeout = theSocketTimeout;
211 		myReapingScheduled = false;
212 		scheduleReaping();
213 	}
214 
215 	private class TimeoutTask implements Runnable {
216 		public void run() {
217 
218 			if (mySocketTimeout == -1) {
219 				return;
220 			}
221 			
222 			ourLog.debug("Beginning socket reaping pass");
223 			try {
224 
225 				List<Socket> socketsToClose = new ArrayList<>();
226 				long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
227 				synchronized (HohRawClientMultithreaded.this) {
228 
229 					for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
230 						Entry<Socket, Long> nextEntry = iter.next();
231 						if (nextEntry.getValue() <= closeIfActiveBefore) {
232 							Socket key = nextEntry.getKey();
233 							socketsToClose.add(key);
234 							ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
235 							iter.remove();
236 						} else {
237 							if (ourLog.isDebugEnabled()) {
238 								ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
239 							}
240 						}
241 					}
242 
243 					myReapingScheduled = false;
244 					scheduleReaping();
245 				}
246 
247 				for (Socket next : socketsToClose) {
248 					closeSocket(next);
249 				}
250 			} catch (Throwable e) {
251 				ourLog.error("Failure during reaper pass", e);
252 			}
253 		}
254 	}
255 
256 }