001package ca.uhn.hl7v2.hoh.llp;
002
003import static org.junit.Assert.fail;
004
005import java.io.IOException;
006import java.io.OutputStream;
007import java.io.PushbackInputStream;
008import java.net.InetAddress;
009import java.net.InetSocketAddress;
010import java.net.ServerSocket;
011import java.net.Socket;
012import java.net.SocketTimeoutException;
013import java.util.Arrays;
014import java.util.LinkedList;
015import java.util.Map;
016import java.util.concurrent.CountDownLatch;
017
018import ca.uhn.hl7v2.hoh.api.IAuthorizationServerCallback;
019import ca.uhn.hl7v2.hoh.encoder.EncodingStyle;
020import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpRequestDecoder;
021import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpResponseEncoder;
022import ca.uhn.hl7v2.hoh.sockets.CustomCertificateTlsSocketFactory;
023import ca.uhn.hl7v2.hoh.sockets.ISocketFactory;
024import ca.uhn.hl7v2.hoh.sockets.StandardSocketFactory;
025import ca.uhn.hl7v2.model.Message;
026import ca.uhn.hl7v2.parser.GenericParser;
027
028public class ServerSocketThreadForTesting extends Thread {
029        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ServerSocketThreadForTesting.class);
030
031        private boolean myCloseAfterEachMessage;
032        private int myConnectionCount = 0;
033        private String myContentType;
034        private boolean myDone;
035        private EncodingStyle myEncoding;
036        private boolean myGZipResponse;
037        private CountDownLatch myLatch = new CountDownLatch(1);
038        private String myMessage;
039        private int myPort;
040        private Message myReply;
041        private String myRequestUri;
042        private IAuthorizationServerCallback myServerAuthCallback;
043        private ServerSocket myServerSocket;
044        private boolean mySimulateOneSecondPauseInChunkedEncoding;
045        private ISocketFactory mySocketFactory;
046        private LinkedList<Long> myResponseDelays = new LinkedList<Long>();
047
048        private boolean myCloseNormallyAfterEachMessage;
049
050        public void setResponseDelays(Long... theResponseDelays) {
051                myResponseDelays = new LinkedList<Long>(Arrays.asList(theResponseDelays));
052        }
053        
054        public ServerSocketThreadForTesting(int thePort) {
055                myPort = thePort;
056                mySocketFactory = new StandardSocketFactory();
057        }
058
059        public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback) {
060                myPort = thePort;
061                myServerAuthCallback = theServerAuthCallback;
062                mySocketFactory = new StandardSocketFactory();
063        }
064
065        public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback, ISocketFactory theSocketFactory) {
066                myPort = thePort;
067                myServerAuthCallback = theServerAuthCallback;
068                mySocketFactory = theSocketFactory;
069        }
070
071        public void done() {
072                myDone = true;
073        }
074
075        /**
076         * @return the connectionCount
077         */
078        public int getConnectionCount() {
079                return myConnectionCount;
080        }
081
082        /**
083         * @return the contentType
084         */
085        public String getContentType() {
086                return myContentType;
087        }
088
089        /**
090         * @return the encoding
091         */
092        public EncodingStyle getEncoding() {
093                return myEncoding;
094        }
095
096        /**
097         * @return the latch
098         */
099        public CountDownLatch getLatch() {
100                return myLatch;
101        }
102
103        /**
104         * @return the message
105         */
106        public String getMessage() {
107                return myMessage;
108        }
109
110        /**
111         * @return the reply
112         */
113        public Message getReply() {
114                return myReply;
115        }
116
117        public String getRequestUri() {
118                return myRequestUri;
119        }
120
121        @Override
122        public void run() {
123
124                ourLog.info("Starting server on {}", myPort);
125
126                myConnectionCount = 0;
127
128                Exception ex = null;
129                try {
130                        myServerSocket = mySocketFactory.createServerSocket();
131                        myServerSocket.bind(new InetSocketAddress((InetAddress) null, myPort), 50);
132
133                        // myServerSocket = new ServerSocket(myPort);
134                        myServerSocket.setSoTimeout(1000);
135
136                        while (!myDone) {
137                                try {
138                                        ourLog.trace("Going to accept()");
139                                        Socket newSocket = myServerSocket.accept();
140                                        myConnectionCount++;
141                                        newSocket.setSoTimeout(1000);
142                                        ourLog.info("New socket: {}", newSocket.getInetAddress().toString());
143                                        TestSocketThread t = new TestSocketThread(newSocket);
144                                        t.start();
145                                } catch (SocketTimeoutException e) {
146                                        ourLog.trace("No new connection");
147                                }
148                                myLatch.countDown();
149                        }
150
151                } catch (Exception e) {
152                        ourLog.error("Weird exception!", e);
153                        ex = e;
154                }
155
156                ourLog.info("Shutting down, done is {}", myDone);
157
158                try {
159                        myServerSocket.close();
160                } catch (IOException e) {
161                        ourLog.error("Failed to close", e);
162                }
163
164                if (ex != null) {
165                        fail(ex.getMessage());
166                }
167
168        }
169
170        public void setCloseUnexpectedlyAfterEachMessage() {
171                myCloseAfterEachMessage = true;
172        }
173
174        /**
175         * @param theGZipResponse
176         *            the gZipResponse to set
177         */
178        public void setGZipResponse(boolean theGZipResponse) {
179                myGZipResponse = theGZipResponse;
180        }
181
182        /**
183         * @param theMessage
184         *            the message to set
185         */
186        public void setMessage(String theMessage) {
187                myMessage = theMessage;
188        }
189
190        /**
191         * @param theReply
192         *            the reply to set
193         */
194        public void setReply(Message theReply) {
195                myReply = theReply;
196        }
197
198        public void setServerSockewtFactory(CustomCertificateTlsSocketFactory theServerSocketFactory) {
199                mySocketFactory = theServerSocketFactory;
200        }
201
202        public void setSimulateOneSecondPauseInChunkedEncoding(boolean theB) {
203                mySimulateOneSecondPauseInChunkedEncoding = theB;
204        }
205
206        public class TestSocketThread extends Thread {
207
208                private Socket mySocket;
209
210                public TestSocketThread(Socket theSocket) {
211                        mySocket = theSocket;
212                }
213
214                @Override
215                public void run() {
216
217                        ourLog.info("Starting socket reader");
218                        try {
219                                PushbackInputStream is = new PushbackInputStream(mySocket.getInputStream());
220                                OutputStream os = mySocket.getOutputStream();
221                                while (!myDone) {
222
223                                        try {
224                                                int nextChar = is.read();
225                                                ourLog.info("Read: " + nextChar);
226                                                if (nextChar > 0) {
227                                                        is.unread(nextChar);
228                                                }
229
230                                        } catch (SocketTimeoutException e) {
231                                                // ignore
232                                        }
233
234                                        if (is.available() > 0) {
235                                                ourLog.info("Socket reader has data");
236
237                                                // byte[] bis =
238                                                // IOUtils.readInputStreamIntoByteArraWhileDataAvailable(is);
239                                                // ourLog.info("Received input:\n" + new String(bis,
240                                                // HTTPUtils.DEFAULT_CHARSET));
241
242                                                Hl7OverHttpRequestDecoder d = new Hl7OverHttpRequestDecoder();
243                                                d.setAuthorizationCallback(myServerAuthCallback);
244                                                d.readHeadersAndContentsFromInputStreamAndDecode(is);
245                                                myMessage = d.getMessage();
246                                                myContentType = d.getContentType();
247                                                myEncoding = d.getEncodingStyle();
248                                                myRequestUri = d.getPathRaw();
249
250                                                Message parsedMessage = GenericParser.getInstanceWithNoValidation().parse(myMessage);
251                                                myReply = parsedMessage.generateACK();
252
253                                                synchronized (myResponseDelays) {
254                                                        if (myResponseDelays.size() > 0) {
255                                                                Long millis = myResponseDelays.removeFirst();
256                                                                ourLog.info("Sleeping for {}ms", millis);
257                                                                Thread.sleep(millis);
258                                                        }
259                                                }
260                                                
261                                                Hl7OverHttpResponseEncoder e = new Hl7OverHttpResponseEncoder();
262                                                e.setMessage(myReply.encode());
263                                                e.setGzipData(myGZipResponse);
264
265                                                if (myCloseNormallyAfterEachMessage) {
266                                                        e.setAddConnectionCloseHeader(true);
267                                                }
268                                                
269                                                if (mySimulateOneSecondPauseInChunkedEncoding) {
270                                                        e.encode();
271                                                        e.getHeaders().remove("Content-Length");
272                                                        e.getHeaders().put("Transfer-Encoding", "chunked");
273
274                                                        OutputStream tempOs = os;
275                                                        tempOs.write("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
276                                                        for (Map.Entry<String, String> next : e.getHeaders().entrySet()) {
277                                                                String nextHeader = next.getKey() + ": " + next.getValue();
278                                                                ourLog.debug("Sending response header - " + nextHeader);
279                                                                tempOs.write((nextHeader + "\r\n").getBytes("ISO-8859-1"));
280                                                        }
281
282                                                        tempOs.write("\r\n".getBytes("ISO-8859-1"));
283
284                                                        byte[] bytes = e.getData();
285                                                        int halfLength = bytes.length / 2;
286
287                                                        String chunkLength = Integer.toHexString(halfLength);
288                                                        ourLog.debug("Sending chunk length: {}", halfLength);
289
290                                                        tempOs.write(chunkLength.getBytes("ISO-8859-1"));
291                                                        tempOs.write("\r\n".getBytes("ISO-8859-1"));
292                                                        tempOs.write(bytes, 0, halfLength);
293                                                        tempOs.write("\r\n".getBytes("ISO-8859-1"));
294                                                        tempOs.flush();
295
296                                                        Thread.sleep(1000);
297
298                                                        int remaining = bytes.length - halfLength;
299
300                                                        String remChunkLength = Integer.toHexString(remaining);
301                                                        ourLog.debug("Sending chunk length: {}", remaining);
302                                                        byte[] bytesToSend = remChunkLength.getBytes("ISO-8859-1");
303                                                        ourLog.debug("Sending bytes: {}", bytesToSend);
304                                                        tempOs.write(bytesToSend);
305                                                        tempOs.write("\r\n".getBytes("ISO-8859-1"));
306                                                        tempOs.write(bytes, halfLength, remaining);
307                                                        tempOs.write("\r\n".getBytes("ISO-8859-1"));
308
309                                                        tempOs.write("0\r\n\r\n\r\n".getBytes("ISO-8859-1"));
310                                                        tempOs.flush();
311
312                                                } else {
313
314                                                        e.encodeToOutputStream(os);
315
316                                                }
317
318                                        } else {
319                                                ourLog.trace("Socket reader has NO data");
320                                                try {
321                                                        Thread.sleep(100);
322                                                } catch (Exception e) {
323                                                        // ignore
324                                                }
325                                        }
326
327                                        if (myCloseAfterEachMessage) {
328                                                ourLog.info("Closing incoming socket...");
329                                                mySocket.close();
330                                                break;
331                                        }
332
333                                        if (myCloseNormallyAfterEachMessage) {
334                                                ourLog.info("Closing incoming socket...");
335                                                mySocket.close();
336                                                break;
337                                        }
338                                        
339                                }
340                        } catch (Exception e) {
341                                ourLog.info("Failed!", e);
342                                fail(e.getMessage());
343                        }
344
345                        ourLog.info("Shutting down socket reader");
346
347                }
348
349        }
350
351        public void setCloseNormallyWithHeaderAfterEachMessage() {
352                myCloseNormallyAfterEachMessage=true;
353        }
354
355}