001package ca.uhn.hl7v2.hoh.raw.client;
002
003import java.io.IOException;
004import java.net.Socket;
005import java.net.URL;
006import java.text.SimpleDateFormat;
007import java.util.ArrayList;
008import java.util.Date;
009import java.util.IdentityHashMap;
010import java.util.Iterator;
011import java.util.List;
012import java.util.Map;
013import java.util.Map.Entry;
014import java.util.concurrent.Executors;
015import java.util.concurrent.ScheduledExecutorService;
016import java.util.concurrent.TimeUnit;
017
018import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
019import ca.uhn.hl7v2.hoh.util.Validate;
020
021/**
022 * <p>
023 * Raw message sender using the HL7 over HTTP specification which uses a
024 * {@link ScheduledExecutorService} to provide advanced functionality such as
025 * persistent connections which time out and close automatically.
026 * </p>
027 * <p>
028 * This connector uses an executor service which can start worker threads, so
029 * use caution if embedding within a J2EE container.
030 * </p>
031 */
032public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
033
034        /**
035         * Default {@link #setSocketTimeout(long) Socket Timeout}, 10000ms
036         */
037        public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
038
039        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
040
041        private final ScheduledExecutorService myExecutorService;
042        private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>();
043        private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
044        private boolean myReapingScheduled;
045        private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
046
047        /**
048         * Constructor
049         */
050        public HohRawClientMultithreaded() {
051                myExecutorService = Executors.newScheduledThreadPool(1);
052        }
053
054        /**
055         * Constructor
056         * 
057         * @param theHost
058         *            The HOST (name/address). E.g. "192.168.1.1"
059         * @param thePort
060         *            The PORT. E.g. "8080"
061         * @param thePath
062         *            The path being requested (must either be blank or start with
063         *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
064         */
065        public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
066                this();
067
068                setHost(theHost);
069                setPort(thePort);
070                setUriPath(thePath);
071        }
072
073        /**
074         * Constructor
075         * 
076         * @param theHost
077         *            The HOST (name/address). E.g. "192.168.1.1"
078         * @param thePort
079         *            The PORT. E.g. "8080"
080         * @param theUriPath
081         *            The URI path being requested (must either be blank or start with
082         *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
083         * @param theExecutorService
084         *            The executor service to use for detecting stale sockets
085         */
086        public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
087                super(theHost, thePort, theUriPath);
088                Validate.notNull(theExecutorService, "executorService");
089
090                myExecutorService = theExecutorService;
091        }
092
093        /**
094         * Constructor
095         * 
096         * @param theUrl
097         *            The URL to connect to
098         * @param theExecutorService
099         *            The executor service to use for detecting stale sockets
100         */
101        public HohRawClientMultithreaded(URL theUrl) {
102                this();
103                setUrl(theUrl);
104        }
105
106        /**
107         * Constructor
108         * 
109         * @param theUrl
110         *            The URL to connect to
111         * @param theExecutorService
112         *            The executor service to use for detecting stale sockets
113         */
114        public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
115                super(theUrl);
116                Validate.notNull(theExecutorService, "executorService");
117
118                myExecutorService = theExecutorService;
119        }
120
121        @Override
122        protected synchronized Socket provideSocket() throws IOException {
123                Socket retVal;
124                if (myIdleSocketsToTimeBecameIdle.size() == 0) {
125                        ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
126                        retVal = connect();
127                } else {
128                        retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
129                        myIdleSocketsToTimeBecameIdle.remove(retVal);
130                        if (retVal.isClosed()) {
131                                ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort());
132                                retVal = connect();
133                        } else {
134                                ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort());
135                        }
136                }
137                return retVal;
138        }
139
140        /**
141         * Returns a socket to the pool. If the socket is closed, it will
142         * not be returned.
143         */
144        @Override
145        protected synchronized void returnSocket(Socket theSocket) {
146                if (theSocket.isClosed()) {
147                        return;
148                }
149                
150                long now = System.currentTimeMillis();
151
152                // TODO: reap immediately if timeout is 0
153                
154                if (ourLog.isDebugEnabled()) {
155                        if (mySocketTimeout == -1) {
156                                ourLog.debug("Returning socket, will not attempt to reap");
157                        } else {
158                                ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
159                        }
160                }
161
162                myIdleSocketsToTimeBecameIdle.put(theSocket, now);
163                scheduleReaping();
164        }
165
166        private void scheduleReaping() {
167                long now = System.currentTimeMillis();
168                if (myReapingScheduled) {
169                        ourLog.debug("Reaping already scheduled");
170                        return;
171                }
172
173                if (myIdleSocketsToTimeBecameIdle.size() < 1) {
174                        return;
175                }
176
177                if (mySocketTimeout == -1) {
178                        return;
179                }
180                
181                long earliestReapingTime = Long.MAX_VALUE;
182                for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
183                        long nextReapingTime = next + mySocketTimeout;
184                        if (nextReapingTime < earliestReapingTime) {
185                                earliestReapingTime = nextReapingTime;
186                        }
187                }
188
189                long delay = earliestReapingTime - now;
190                if (ourLog.isDebugEnabled()) {
191                        ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
192                }
193
194                myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
195                myReapingScheduled = true;
196        }
197
198        /**
199         * {@inheritDoc}
200         */
201        public long getSocketTimeout() {
202                return mySocketTimeout;
203        }
204
205        /**
206         * {@inheritDoc}
207         */
208        public synchronized void setSocketTimeout(long theSocketTimeout) {
209                if (mySocketTimeout < -1) {
210                        throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
211                }
212                mySocketTimeout = theSocketTimeout;
213                myReapingScheduled = false;
214                scheduleReaping();
215        }
216
217        private class TimeoutTask implements Runnable {
218                public void run() {
219
220                        if (mySocketTimeout == -1) {
221                                return;
222                        }
223                        
224                        ourLog.debug("Beginning socket reaping pass");
225                        try {
226
227                                List<Socket> socketsToClose = new ArrayList<Socket>();
228                                long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
229                                synchronized (HohRawClientMultithreaded.this) {
230
231                                        for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
232                                                Entry<Socket, Long> nextEntry = iter.next();
233                                                if (nextEntry.getValue() <= closeIfActiveBefore) {
234                                                        Socket key = nextEntry.getKey();
235                                                        socketsToClose.add(key);
236                                                        ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
237                                                        iter.remove();
238                                                } else {
239                                                        if (ourLog.isDebugEnabled()) {
240                                                                ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
241                                                        }
242                                                }
243                                        }
244
245                                        myReapingScheduled = false;
246                                        scheduleReaping();
247                                }
248
249                                for (Socket next : socketsToClose) {
250                                        closeSocket(next);
251                                }
252                        } catch (Throwable e) {
253                                ourLog.error("Failure during reaper pass", e);
254                        }
255                }
256        }
257
258}