001package ca.uhn.hl7v2.hoh.llp; 002 003import static org.junit.Assert.fail; 004 005import java.io.IOException; 006import java.io.OutputStream; 007import java.io.PushbackInputStream; 008import java.net.InetAddress; 009import java.net.InetSocketAddress; 010import java.net.ServerSocket; 011import java.net.Socket; 012import java.net.SocketTimeoutException; 013import java.util.Arrays; 014import java.util.LinkedList; 015import java.util.Map; 016import java.util.concurrent.CountDownLatch; 017 018import ca.uhn.hl7v2.hoh.api.IAuthorizationServerCallback; 019import ca.uhn.hl7v2.hoh.encoder.EncodingStyle; 020import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpRequestDecoder; 021import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpResponseEncoder; 022import ca.uhn.hl7v2.hoh.sockets.CustomCertificateTlsSocketFactory; 023import ca.uhn.hl7v2.hoh.sockets.ISocketFactory; 024import ca.uhn.hl7v2.hoh.sockets.StandardSocketFactory; 025import ca.uhn.hl7v2.model.Message; 026import ca.uhn.hl7v2.parser.GenericParser; 027 028public class ServerSocketThreadForTesting extends Thread { 029 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ServerSocketThreadForTesting.class); 030 031 private boolean myCloseAfterEachMessage; 032 private int myConnectionCount = 0; 033 private String myContentType; 034 private boolean myDone; 035 private EncodingStyle myEncoding; 036 private boolean myGZipResponse; 037 private CountDownLatch myLatch = new CountDownLatch(1); 038 private String myMessage; 039 private int myPort; 040 private Message myReply; 041 private String myRequestUri; 042 private IAuthorizationServerCallback myServerAuthCallback; 043 private ServerSocket myServerSocket; 044 private boolean mySimulateOneSecondPauseInChunkedEncoding; 045 private ISocketFactory mySocketFactory; 046 private LinkedList<Long> myResponseDelays = new LinkedList<Long>(); 047 048 private boolean myCloseNormallyAfterEachMessage; 049 050 public void setResponseDelays(Long... theResponseDelays) { 051 myResponseDelays = new LinkedList<Long>(Arrays.asList(theResponseDelays)); 052 } 053 054 public ServerSocketThreadForTesting(int thePort) { 055 myPort = thePort; 056 mySocketFactory = new StandardSocketFactory(); 057 } 058 059 public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback) { 060 myPort = thePort; 061 myServerAuthCallback = theServerAuthCallback; 062 mySocketFactory = new StandardSocketFactory(); 063 } 064 065 public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback, ISocketFactory theSocketFactory) { 066 myPort = thePort; 067 myServerAuthCallback = theServerAuthCallback; 068 mySocketFactory = theSocketFactory; 069 } 070 071 public void done() { 072 myDone = true; 073 } 074 075 /** 076 * @return the connectionCount 077 */ 078 public int getConnectionCount() { 079 return myConnectionCount; 080 } 081 082 /** 083 * @return the contentType 084 */ 085 public String getContentType() { 086 return myContentType; 087 } 088 089 /** 090 * @return the encoding 091 */ 092 public EncodingStyle getEncoding() { 093 return myEncoding; 094 } 095 096 /** 097 * @return the latch 098 */ 099 public CountDownLatch getLatch() { 100 return myLatch; 101 } 102 103 /** 104 * @return the message 105 */ 106 public String getMessage() { 107 return myMessage; 108 } 109 110 /** 111 * @return the reply 112 */ 113 public Message getReply() { 114 return myReply; 115 } 116 117 public String getRequestUri() { 118 return myRequestUri; 119 } 120 121 @Override 122 public void run() { 123 124 ourLog.info("Starting server on {}", myPort); 125 126 myConnectionCount = 0; 127 128 Exception ex = null; 129 try { 130 myServerSocket = mySocketFactory.createServerSocket(); 131 myServerSocket.bind(new InetSocketAddress((InetAddress) null, myPort), 50); 132 133 // myServerSocket = new ServerSocket(myPort); 134 myServerSocket.setSoTimeout(1000); 135 136 while (!myDone) { 137 try { 138 ourLog.trace("Going to accept()"); 139 Socket newSocket = myServerSocket.accept(); 140 myConnectionCount++; 141 newSocket.setSoTimeout(1000); 142 ourLog.info("New socket: {}", newSocket.getInetAddress().toString()); 143 TestSocketThread t = new TestSocketThread(newSocket); 144 t.start(); 145 } catch (SocketTimeoutException e) { 146 ourLog.trace("No new connection"); 147 } 148 myLatch.countDown(); 149 } 150 151 } catch (Exception e) { 152 ourLog.error("Weird exception!", e); 153 ex = e; 154 } 155 156 ourLog.info("Shutting down, done is {}", myDone); 157 158 try { 159 myServerSocket.close(); 160 } catch (IOException e) { 161 ourLog.error("Failed to close", e); 162 } 163 164 if (ex != null) { 165 fail(ex.getMessage()); 166 } 167 168 } 169 170 public void setCloseUnexpectedlyAfterEachMessage() { 171 myCloseAfterEachMessage = true; 172 } 173 174 /** 175 * @param theGZipResponse 176 * the gZipResponse to set 177 */ 178 public void setGZipResponse(boolean theGZipResponse) { 179 myGZipResponse = theGZipResponse; 180 } 181 182 /** 183 * @param theMessage 184 * the message to set 185 */ 186 public void setMessage(String theMessage) { 187 myMessage = theMessage; 188 } 189 190 /** 191 * @param theReply 192 * the reply to set 193 */ 194 public void setReply(Message theReply) { 195 myReply = theReply; 196 } 197 198 public void setServerSockewtFactory(CustomCertificateTlsSocketFactory theServerSocketFactory) { 199 mySocketFactory = theServerSocketFactory; 200 } 201 202 public void setSimulateOneSecondPauseInChunkedEncoding(boolean theB) { 203 mySimulateOneSecondPauseInChunkedEncoding = theB; 204 } 205 206 public class TestSocketThread extends Thread { 207 208 private Socket mySocket; 209 210 public TestSocketThread(Socket theSocket) { 211 mySocket = theSocket; 212 } 213 214 @Override 215 public void run() { 216 217 ourLog.info("Starting socket reader"); 218 try { 219 PushbackInputStream is = new PushbackInputStream(mySocket.getInputStream()); 220 OutputStream os = mySocket.getOutputStream(); 221 while (!myDone) { 222 223 try { 224 int nextChar = is.read(); 225 ourLog.info("Read: " + nextChar); 226 if (nextChar > 0) { 227 is.unread(nextChar); 228 } 229 230 } catch (SocketTimeoutException e) { 231 // ignore 232 } 233 234 if (is.available() > 0) { 235 ourLog.info("Socket reader has data"); 236 237 // byte[] bis = 238 // IOUtils.readInputStreamIntoByteArraWhileDataAvailable(is); 239 // ourLog.info("Received input:\n" + new String(bis, 240 // HTTPUtils.DEFAULT_CHARSET)); 241 242 Hl7OverHttpRequestDecoder d = new Hl7OverHttpRequestDecoder(); 243 d.setAuthorizationCallback(myServerAuthCallback); 244 d.readHeadersAndContentsFromInputStreamAndDecode(is); 245 myMessage = d.getMessage(); 246 myContentType = d.getContentType(); 247 myEncoding = d.getEncodingStyle(); 248 myRequestUri = d.getPathRaw(); 249 250 Message parsedMessage = GenericParser.getInstanceWithNoValidation().parse(myMessage); 251 myReply = parsedMessage.generateACK(); 252 253 synchronized (myResponseDelays) { 254 if (myResponseDelays.size() > 0) { 255 Long millis = myResponseDelays.removeFirst(); 256 ourLog.info("Sleeping for {}ms", millis); 257 Thread.sleep(millis); 258 } 259 } 260 261 Hl7OverHttpResponseEncoder e = new Hl7OverHttpResponseEncoder(); 262 e.setMessage(myReply.encode()); 263 e.setGzipData(myGZipResponse); 264 265 if (myCloseNormallyAfterEachMessage) { 266 e.setAddConnectionCloseHeader(true); 267 } 268 269 if (mySimulateOneSecondPauseInChunkedEncoding) { 270 e.encode(); 271 e.getHeaders().remove("Content-Length"); 272 e.getHeaders().put("Transfer-Encoding", "chunked"); 273 274 OutputStream tempOs = os; 275 tempOs.write("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1")); 276 for (Map.Entry<String, String> next : e.getHeaders().entrySet()) { 277 String nextHeader = next.getKey() + ": " + next.getValue(); 278 ourLog.debug("Sending response header - " + nextHeader); 279 tempOs.write((nextHeader + "\r\n").getBytes("ISO-8859-1")); 280 } 281 282 tempOs.write("\r\n".getBytes("ISO-8859-1")); 283 284 byte[] bytes = e.getData(); 285 int halfLength = bytes.length / 2; 286 287 String chunkLength = Integer.toHexString(halfLength); 288 ourLog.debug("Sending chunk length: {}", halfLength); 289 290 tempOs.write(chunkLength.getBytes("ISO-8859-1")); 291 tempOs.write("\r\n".getBytes("ISO-8859-1")); 292 tempOs.write(bytes, 0, halfLength); 293 tempOs.write("\r\n".getBytes("ISO-8859-1")); 294 tempOs.flush(); 295 296 Thread.sleep(1000); 297 298 int remaining = bytes.length - halfLength; 299 300 String remChunkLength = Integer.toHexString(remaining); 301 ourLog.debug("Sending chunk length: {}", remaining); 302 byte[] bytesToSend = remChunkLength.getBytes("ISO-8859-1"); 303 ourLog.debug("Sending bytes: {}", bytesToSend); 304 tempOs.write(bytesToSend); 305 tempOs.write("\r\n".getBytes("ISO-8859-1")); 306 tempOs.write(bytes, halfLength, remaining); 307 tempOs.write("\r\n".getBytes("ISO-8859-1")); 308 309 tempOs.write("0\r\n\r\n\r\n".getBytes("ISO-8859-1")); 310 tempOs.flush(); 311 312 } else { 313 314 e.encodeToOutputStream(os); 315 316 } 317 318 } else { 319 ourLog.trace("Socket reader has NO data"); 320 try { 321 Thread.sleep(100); 322 } catch (Exception e) { 323 // ignore 324 } 325 } 326 327 if (myCloseAfterEachMessage) { 328 ourLog.info("Closing incoming socket..."); 329 mySocket.close(); 330 break; 331 } 332 333 if (myCloseNormallyAfterEachMessage) { 334 ourLog.info("Closing incoming socket..."); 335 mySocket.close(); 336 break; 337 } 338 339 } 340 } catch (Exception e) { 341 ourLog.info("Failed!", e); 342 fail(e.getMessage()); 343 } 344 345 ourLog.info("Shutting down socket reader"); 346 347 } 348 349 } 350 351 public void setCloseNormallyWithHeaderAfterEachMessage() { 352 myCloseNormallyAfterEachMessage=true; 353 } 354 355}