001package ca.uhn.hl7v2.hoh.raw.client; 002 003import java.io.IOException; 004import java.net.Socket; 005import java.net.URL; 006import java.text.SimpleDateFormat; 007import java.util.ArrayList; 008import java.util.Date; 009import java.util.IdentityHashMap; 010import java.util.Iterator; 011import java.util.List; 012import java.util.Map; 013import java.util.Map.Entry; 014import java.util.concurrent.Executors; 015import java.util.concurrent.ScheduledExecutorService; 016import java.util.concurrent.TimeUnit; 017 018import ca.uhn.hl7v2.hoh.api.IClientMultithreaded; 019import ca.uhn.hl7v2.hoh.util.Validate; 020 021/** 022 * <p> 023 * Raw message sender using the HL7 over HTTP specification which uses a 024 * {@link ScheduledExecutorService} to provide advanced functionality such as 025 * persistent connections which time out and close automatically. 026 * </p> 027 * <p> 028 * This connector uses an executor service which can start worker threads, so 029 * use caution if embedding within a J2EE container. 030 * </p> 031 */ 032public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded { 033 034 /** 035 * Default {@link #setSocketTimeout(long) Socket Timeout}, 10000ms 036 */ 037 public static final long DEFAULT_SOCKET_TIMEOUT = 10000; 038 039 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class); 040 041 private final ScheduledExecutorService myExecutorService; 042 private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>(); 043 private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS"); 044 private boolean myReapingScheduled; 045 private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT; 046 047 /** 048 * Constructor 049 */ 050 public HohRawClientMultithreaded() { 051 myExecutorService = Executors.newScheduledThreadPool(1); 052 } 053 054 /** 055 * Constructor 056 * 057 * @param theHost 058 * The HOST (name/address). E.g. "192.168.1.1" 059 * @param thePort 060 * The PORT. E.g. "8080" 061 * @param thePath 062 * The path being requested (must either be blank or start with 063 * '/' and contain a path). E.g. "/Apps/Receiver.jsp" 064 */ 065 public HohRawClientMultithreaded(String theHost, int thePort, String thePath) { 066 this(); 067 068 setHost(theHost); 069 setPort(thePort); 070 setUriPath(thePath); 071 } 072 073 /** 074 * Constructor 075 * 076 * @param theHost 077 * The HOST (name/address). E.g. "192.168.1.1" 078 * @param thePort 079 * The PORT. E.g. "8080" 080 * @param theUriPath 081 * The URI path being requested (must either be blank or start with 082 * '/' and contain a path). E.g. "/Apps/Receiver.jsp" 083 * @param theExecutorService 084 * The executor service to use for detecting stale sockets 085 */ 086 public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) { 087 super(theHost, thePort, theUriPath); 088 Validate.notNull(theExecutorService, "executorService"); 089 090 myExecutorService = theExecutorService; 091 } 092 093 /** 094 * Constructor 095 * 096 * @param theUrl 097 * The URL to connect to 098 * @param theExecutorService 099 * The executor service to use for detecting stale sockets 100 */ 101 public HohRawClientMultithreaded(URL theUrl) { 102 this(); 103 setUrl(theUrl); 104 } 105 106 /** 107 * Constructor 108 * 109 * @param theUrl 110 * The URL to connect to 111 * @param theExecutorService 112 * The executor service to use for detecting stale sockets 113 */ 114 public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) { 115 super(theUrl); 116 Validate.notNull(theExecutorService, "executorService"); 117 118 myExecutorService = theExecutorService; 119 } 120 121 @Override 122 protected synchronized Socket provideSocket() throws IOException { 123 Socket retVal; 124 if (myIdleSocketsToTimeBecameIdle.size() == 0) { 125 ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort()); 126 retVal = connect(); 127 } else { 128 retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next(); 129 myIdleSocketsToTimeBecameIdle.remove(retVal); 130 if (retVal.isClosed()) { 131 ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort()); 132 retVal = connect(); 133 } else { 134 ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort()); 135 } 136 } 137 return retVal; 138 } 139 140 /** 141 * Returns a socket to the pool. If the socket is closed, it will 142 * not be returned. 143 */ 144 @Override 145 protected synchronized void returnSocket(Socket theSocket) { 146 if (theSocket.isClosed()) { 147 return; 148 } 149 150 long now = System.currentTimeMillis(); 151 152 // TODO: reap immediately if timeout is 0 153 154 if (ourLog.isDebugEnabled()) { 155 if (mySocketTimeout == -1) { 156 ourLog.debug("Returning socket, will not attempt to reap"); 157 } else { 158 ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout))); 159 } 160 } 161 162 myIdleSocketsToTimeBecameIdle.put(theSocket, now); 163 scheduleReaping(); 164 } 165 166 private void scheduleReaping() { 167 long now = System.currentTimeMillis(); 168 if (myReapingScheduled) { 169 ourLog.debug("Reaping already scheduled"); 170 return; 171 } 172 173 if (myIdleSocketsToTimeBecameIdle.size() < 1) { 174 return; 175 } 176 177 if (mySocketTimeout == -1) { 178 return; 179 } 180 181 long earliestReapingTime = Long.MAX_VALUE; 182 for (Long next : myIdleSocketsToTimeBecameIdle.values()) { 183 long nextReapingTime = next + mySocketTimeout; 184 if (nextReapingTime < earliestReapingTime) { 185 earliestReapingTime = nextReapingTime; 186 } 187 } 188 189 long delay = earliestReapingTime - now; 190 if (ourLog.isDebugEnabled()) { 191 ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime))); 192 } 193 194 myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS); 195 myReapingScheduled = true; 196 } 197 198 /** 199 * {@inheritDoc} 200 */ 201 public long getSocketTimeout() { 202 return mySocketTimeout; 203 } 204 205 /** 206 * {@inheritDoc} 207 */ 208 public synchronized void setSocketTimeout(long theSocketTimeout) { 209 if (mySocketTimeout < -1) { 210 throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer"); 211 } 212 mySocketTimeout = theSocketTimeout; 213 myReapingScheduled = false; 214 scheduleReaping(); 215 } 216 217 private class TimeoutTask implements Runnable { 218 public void run() { 219 220 if (mySocketTimeout == -1) { 221 return; 222 } 223 224 ourLog.debug("Beginning socket reaping pass"); 225 try { 226 227 List<Socket> socketsToClose = new ArrayList<Socket>(); 228 long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout; 229 synchronized (HohRawClientMultithreaded.this) { 230 231 for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) { 232 Entry<Socket, Long> nextEntry = iter.next(); 233 if (nextEntry.getValue() <= closeIfActiveBefore) { 234 Socket key = nextEntry.getKey(); 235 socketsToClose.add(key); 236 ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue())); 237 iter.remove(); 238 } else { 239 if (ourLog.isDebugEnabled()) { 240 ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining"); 241 } 242 } 243 } 244 245 myReapingScheduled = false; 246 scheduleReaping(); 247 } 248 249 for (Socket next : socketsToClose) { 250 closeSocket(next); 251 } 252 } catch (Throwable e) { 253 ourLog.error("Failure during reaper pass", e); 254 } 255 } 256 } 257 258}