View Javadoc

1   package ca.uhn.hl7v2.hoh.raw.client;
2   
3   import java.io.IOException;
4   import java.net.Socket;
5   import java.net.URL;
6   import java.text.SimpleDateFormat;
7   import java.util.ArrayList;
8   import java.util.Date;
9   import java.util.IdentityHashMap;
10  import java.util.Iterator;
11  import java.util.List;
12  import java.util.Map;
13  import java.util.Map.Entry;
14  import java.util.concurrent.Executors;
15  import java.util.concurrent.ScheduledExecutorService;
16  import java.util.concurrent.TimeUnit;
17  
18  import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
19  import ca.uhn.hl7v2.hoh.util.Validate;
20  
21  /**
22   * <p>
23   * Raw message sender using the HL7 over HTTP specification which uses a
24   * {@link ScheduledExecutorService} to provide advanced functionality such as
25   * persistent connections which time out and close automatically.
26   * </p>
27   * <p>
28   * This connector uses an executor service which can start worker threads, so
29   * use caution if embedding within a J2EE container.
30   * </p>
31   */
32  public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
33  
34  	/**
35  	 * Default {@link #setSocketTimeout(long) Socket Timeout}, 10000ms
36  	 */
37  	public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
38  
39  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
40  
41  	private final ScheduledExecutorService myExecutorService;
42  	private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>();
43  	private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
44  	private boolean myReapingScheduled;
45  	private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
46  
47  	/**
48  	 * Constructor
49  	 */
50  	public HohRawClientMultithreaded() {
51  		myExecutorService = Executors.newScheduledThreadPool(1);
52  	}
53  
54  	/**
55  	 * Constructor
56  	 * 
57  	 * @param theHost
58  	 *            The HOST (name/address). E.g. "192.168.1.1"
59  	 * @param thePort
60  	 *            The PORT. E.g. "8080"
61  	 * @param thePath
62  	 *            The path being requested (must either be blank or start with
63  	 *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
64  	 */
65  	public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
66  		this();
67  
68  		setHost(theHost);
69  		setPort(thePort);
70  		setUriPath(thePath);
71  	}
72  
73  	/**
74  	 * Constructor
75  	 * 
76  	 * @param theHost
77  	 *            The HOST (name/address). E.g. "192.168.1.1"
78  	 * @param thePort
79  	 *            The PORT. E.g. "8080"
80  	 * @param theUriPath
81  	 *            The URI path being requested (must either be blank or start with
82  	 *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
83  	 * @param theExecutorService
84  	 *            The executor service to use for detecting stale sockets
85  	 */
86  	public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
87  		super(theHost, thePort, theUriPath);
88  		Validate.notNull(theExecutorService, "executorService");
89  
90  		myExecutorService = theExecutorService;
91  	}
92  
93  	/**
94  	 * Constructor
95  	 * 
96  	 * @param theUrl
97  	 *            The URL to connect to
98  	 * @param theExecutorService
99  	 *            The executor service to use for detecting stale sockets
100 	 */
101 	public HohRawClientMultithreaded(URL theUrl) {
102 		this();
103 		setUrl(theUrl);
104 	}
105 
106 	/**
107 	 * Constructor
108 	 * 
109 	 * @param theUrl
110 	 *            The URL to connect to
111 	 * @param theExecutorService
112 	 *            The executor service to use for detecting stale sockets
113 	 */
114 	public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
115 		super(theUrl);
116 		Validate.notNull(theExecutorService, "executorService");
117 
118 		myExecutorService = theExecutorService;
119 	}
120 
121 	@Override
122 	protected synchronized Socket provideSocket() throws IOException {
123 		Socket retVal;
124 		if (myIdleSocketsToTimeBecameIdle.size() == 0) {
125 			ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
126 			retVal = connect();
127 		} else {
128 			retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
129 			myIdleSocketsToTimeBecameIdle.remove(retVal);
130 			if (retVal.isClosed()) {
131 				ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort());
132 				retVal = connect();
133 			} else {
134 				ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort());
135 			}
136 		}
137 		return retVal;
138 	}
139 
140 	/**
141 	 * Returns a socket to the pool. If the socket is closed, it will
142 	 * not be returned.
143 	 */
144 	@Override
145 	protected synchronized void returnSocket(Socket theSocket) {
146 		if (theSocket.isClosed()) {
147 			return;
148 		}
149 		
150 		long now = System.currentTimeMillis();
151 
152 		// TODO: reap immediately if timeout is 0
153 		
154 		if (ourLog.isDebugEnabled()) {
155 			if (mySocketTimeout == -1) {
156 				ourLog.debug("Returning socket, will not attempt to reap");
157 			} else {
158 				ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
159 			}
160 		}
161 
162 		myIdleSocketsToTimeBecameIdle.put(theSocket, now);
163 		scheduleReaping();
164 	}
165 
166 	private void scheduleReaping() {
167 		long now = System.currentTimeMillis();
168 		if (myReapingScheduled) {
169 			ourLog.debug("Reaping already scheduled");
170 			return;
171 		}
172 
173 		if (myIdleSocketsToTimeBecameIdle.size() < 1) {
174 			return;
175 		}
176 
177 		if (mySocketTimeout == -1) {
178 			return;
179 		}
180 		
181 		long earliestReapingTime = Long.MAX_VALUE;
182 		for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
183 			long nextReapingTime = next + mySocketTimeout;
184 			if (nextReapingTime < earliestReapingTime) {
185 				earliestReapingTime = nextReapingTime;
186 			}
187 		}
188 
189 		long delay = earliestReapingTime - now;
190 		if (ourLog.isDebugEnabled()) {
191 			ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
192 		}
193 
194 		myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
195 		myReapingScheduled = true;
196 	}
197 
198 	/**
199 	 * {@inheritDoc}
200 	 */
201 	public long getSocketTimeout() {
202 		return mySocketTimeout;
203 	}
204 
205 	/**
206 	 * {@inheritDoc}
207 	 */
208 	public synchronized void setSocketTimeout(long theSocketTimeout) {
209 		if (mySocketTimeout < -1) {
210 			throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
211 		}
212 		mySocketTimeout = theSocketTimeout;
213 		myReapingScheduled = false;
214 		scheduleReaping();
215 	}
216 
217 	private class TimeoutTask implements Runnable {
218 		public void run() {
219 
220 			if (mySocketTimeout == -1) {
221 				return;
222 			}
223 			
224 			ourLog.debug("Beginning socket reaping pass");
225 			try {
226 
227 				List<Socket> socketsToClose = new ArrayList<Socket>();
228 				long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
229 				synchronized (HohRawClientMultithreaded.this) {
230 
231 					for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
232 						Entry<Socket, Long> nextEntry = iter.next();
233 						if (nextEntry.getValue() <= closeIfActiveBefore) {
234 							Socket key = nextEntry.getKey();
235 							socketsToClose.add(key);
236 							ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
237 							iter.remove();
238 						} else {
239 							if (ourLog.isDebugEnabled()) {
240 								ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
241 							}
242 						}
243 					}
244 
245 					myReapingScheduled = false;
246 					scheduleReaping();
247 				}
248 
249 				for (Socket next : socketsToClose) {
250 					closeSocket(next);
251 				}
252 			} catch (Throwable e) {
253 				ourLog.error("Failure during reaper pass", e);
254 			}
255 		}
256 	}
257 
258 }