View Javadoc

1   package ca.uhn.hl7v2.hoh.llp;
2   
3   import static org.junit.Assert.fail;
4   
5   import java.io.IOException;
6   import java.io.OutputStream;
7   import java.io.PushbackInputStream;
8   import java.net.InetAddress;
9   import java.net.InetSocketAddress;
10  import java.net.ServerSocket;
11  import java.net.Socket;
12  import java.net.SocketTimeoutException;
13  import java.util.Arrays;
14  import java.util.LinkedList;
15  import java.util.Map;
16  import java.util.concurrent.CountDownLatch;
17  
18  import ca.uhn.hl7v2.hoh.api.IAuthorizationServerCallback;
19  import ca.uhn.hl7v2.hoh.encoder.EncodingStyle;
20  import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpRequestDecoder;
21  import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpResponseEncoder;
22  import ca.uhn.hl7v2.hoh.sockets.CustomCertificateTlsSocketFactory;
23  import ca.uhn.hl7v2.hoh.sockets.ISocketFactory;
24  import ca.uhn.hl7v2.hoh.sockets.StandardSocketFactory;
25  import ca.uhn.hl7v2.model.Message;
26  import ca.uhn.hl7v2.parser.GenericParser;
27  
28  public class ServerSocketThreadForTesting extends Thread {
29  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ServerSocketThreadForTesting.class);
30  
31  	private boolean myCloseAfterEachMessage;
32  	private int myConnectionCount = 0;
33  	private String myContentType;
34  	private boolean myDone;
35  	private EncodingStyle myEncoding;
36  	private boolean myGZipResponse;
37  	private CountDownLatch myLatch = new CountDownLatch(1);
38  	private String myMessage;
39  	private int myPort;
40  	private Message myReply;
41  	private String myRequestUri;
42  	private IAuthorizationServerCallback myServerAuthCallback;
43  	private ServerSocket myServerSocket;
44  	private boolean mySimulateOneSecondPauseInChunkedEncoding;
45  	private ISocketFactory mySocketFactory;
46  	private LinkedList<Long> myResponseDelays = new LinkedList<Long>();
47  
48  	private boolean myCloseNormallyAfterEachMessage;
49  
50  	public void setResponseDelays(Long... theResponseDelays) {
51  		myResponseDelays = new LinkedList<Long>(Arrays.asList(theResponseDelays));
52  	}
53  	
54  	public ServerSocketThreadForTesting(int thePort) {
55  		myPort = thePort;
56  		mySocketFactory = new StandardSocketFactory();
57  	}
58  
59  	public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback) {
60  		myPort = thePort;
61  		myServerAuthCallback = theServerAuthCallback;
62  		mySocketFactory = new StandardSocketFactory();
63  	}
64  
65  	public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback, ISocketFactory theSocketFactory) {
66  		myPort = thePort;
67  		myServerAuthCallback = theServerAuthCallback;
68  		mySocketFactory = theSocketFactory;
69  	}
70  
71  	public void done() {
72  		myDone = true;
73  	}
74  
75  	/**
76  	 * @return the connectionCount
77  	 */
78  	public int getConnectionCount() {
79  		return myConnectionCount;
80  	}
81  
82  	/**
83  	 * @return the contentType
84  	 */
85  	public String getContentType() {
86  		return myContentType;
87  	}
88  
89  	/**
90  	 * @return the encoding
91  	 */
92  	public EncodingStyle getEncoding() {
93  		return myEncoding;
94  	}
95  
96  	/**
97  	 * @return the latch
98  	 */
99  	public CountDownLatch getLatch() {
100 		return myLatch;
101 	}
102 
103 	/**
104 	 * @return the message
105 	 */
106 	public String getMessage() {
107 		return myMessage;
108 	}
109 
110 	/**
111 	 * @return the reply
112 	 */
113 	public Message getReply() {
114 		return myReply;
115 	}
116 
117 	public String getRequestUri() {
118 		return myRequestUri;
119 	}
120 
121 	@Override
122 	public void run() {
123 
124 		ourLog.info("Starting server on {}", myPort);
125 
126 		myConnectionCount = 0;
127 
128 		Exception ex = null;
129 		try {
130 			myServerSocket = mySocketFactory.createServerSocket();
131 			myServerSocket.bind(new InetSocketAddress((InetAddress) null, myPort), 50);
132 
133 			// myServerSocket = new ServerSocket(myPort);
134 			myServerSocket.setSoTimeout(1000);
135 
136 			while (!myDone) {
137 				try {
138 					ourLog.trace("Going to accept()");
139 					Socket newSocket = myServerSocket.accept();
140 					myConnectionCount++;
141 					newSocket.setSoTimeout(1000);
142 					ourLog.info("New socket: {}", newSocket.getInetAddress().toString());
143 					TestSocketThread t = new TestSocketThread(newSocket);
144 					t.start();
145 				} catch (SocketTimeoutException e) {
146 					ourLog.trace("No new connection");
147 				}
148 				myLatch.countDown();
149 			}
150 
151 		} catch (Exception e) {
152 			ourLog.error("Weird exception!", e);
153 			ex = e;
154 		}
155 
156 		ourLog.info("Shutting down, done is {}", myDone);
157 
158 		try {
159 			myServerSocket.close();
160 		} catch (IOException e) {
161 			ourLog.error("Failed to close", e);
162 		}
163 
164 		if (ex != null) {
165 			fail(ex.getMessage());
166 		}
167 
168 	}
169 
170 	public void setCloseUnexpectedlyAfterEachMessage() {
171 		myCloseAfterEachMessage = true;
172 	}
173 
174 	/**
175 	 * @param theGZipResponse
176 	 *            the gZipResponse to set
177 	 */
178 	public void setGZipResponse(boolean theGZipResponse) {
179 		myGZipResponse = theGZipResponse;
180 	}
181 
182 	/**
183 	 * @param theMessage
184 	 *            the message to set
185 	 */
186 	public void setMessage(String theMessage) {
187 		myMessage = theMessage;
188 	}
189 
190 	/**
191 	 * @param theReply
192 	 *            the reply to set
193 	 */
194 	public void setReply(Message theReply) {
195 		myReply = theReply;
196 	}
197 
198 	public void setServerSockewtFactory(CustomCertificateTlsSocketFactory theServerSocketFactory) {
199 		mySocketFactory = theServerSocketFactory;
200 	}
201 
202 	public void setSimulateOneSecondPauseInChunkedEncoding(boolean theB) {
203 		mySimulateOneSecondPauseInChunkedEncoding = theB;
204 	}
205 
206 	public class TestSocketThread extends Thread {
207 
208 		private Socket mySocket;
209 
210 		public TestSocketThread(Socket theSocket) {
211 			mySocket = theSocket;
212 		}
213 
214 		@Override
215 		public void run() {
216 
217 			ourLog.info("Starting socket reader");
218 			try {
219 				PushbackInputStream is = new PushbackInputStream(mySocket.getInputStream());
220 				OutputStream os = mySocket.getOutputStream();
221 				while (!myDone) {
222 
223 					try {
224 						int nextChar = is.read();
225 						ourLog.info("Read: " + nextChar);
226 						if (nextChar > 0) {
227 							is.unread(nextChar);
228 						}
229 
230 					} catch (SocketTimeoutException e) {
231 						// ignore
232 					}
233 
234 					if (is.available() > 0) {
235 						ourLog.info("Socket reader has data");
236 
237 						// byte[] bis =
238 						// IOUtils.readInputStreamIntoByteArraWhileDataAvailable(is);
239 						// ourLog.info("Received input:\n" + new String(bis,
240 						// HTTPUtils.DEFAULT_CHARSET));
241 
242 						Hl7OverHttpRequestDecoder d = new Hl7OverHttpRequestDecoder();
243 						d.setAuthorizationCallback(myServerAuthCallback);
244 						d.readHeadersAndContentsFromInputStreamAndDecode(is);
245 						myMessage = d.getMessage();
246 						myContentType = d.getContentType();
247 						myEncoding = d.getEncodingStyle();
248 						myRequestUri = d.getPathRaw();
249 
250 						Message parsedMessage = GenericParser.getInstanceWithNoValidation().parse(myMessage);
251 						myReply = parsedMessage.generateACK();
252 
253 						synchronized (myResponseDelays) {
254 							if (myResponseDelays.size() > 0) {
255 								Long millis = myResponseDelays.removeFirst();
256 								ourLog.info("Sleeping for {}ms", millis);
257 								Thread.sleep(millis);
258 							}
259 						}
260 						
261 						Hl7OverHttpResponseEncoder e = new Hl7OverHttpResponseEncoder();
262 						e.setMessage(myReply.encode());
263 						e.setGzipData(myGZipResponse);
264 
265 						if (myCloseNormallyAfterEachMessage) {
266 							e.setAddConnectionCloseHeader(true);
267 						}
268 						
269 						if (mySimulateOneSecondPauseInChunkedEncoding) {
270 							e.encode();
271 							e.getHeaders().remove("Content-Length");
272 							e.getHeaders().put("Transfer-Encoding", "chunked");
273 
274 							OutputStream tempOs = os;
275 							tempOs.write("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
276 							for (Map.Entry<String, String> next : e.getHeaders().entrySet()) {
277 								String nextHeader = next.getKey() + ": " + next.getValue();
278 								ourLog.debug("Sending response header - " + nextHeader);
279 								tempOs.write((nextHeader + "\r\n").getBytes("ISO-8859-1"));
280 							}
281 
282 							tempOs.write("\r\n".getBytes("ISO-8859-1"));
283 
284 							byte[] bytes = e.getData();
285 							int halfLength = bytes.length / 2;
286 
287 							String chunkLength = Integer.toHexString(halfLength);
288 							ourLog.debug("Sending chunk length: {}", halfLength);
289 
290 							tempOs.write(chunkLength.getBytes("ISO-8859-1"));
291 							tempOs.write("\r\n".getBytes("ISO-8859-1"));
292 							tempOs.write(bytes, 0, halfLength);
293 							tempOs.write("\r\n".getBytes("ISO-8859-1"));
294 							tempOs.flush();
295 
296 							Thread.sleep(1000);
297 
298 							int remaining = bytes.length - halfLength;
299 
300 							String remChunkLength = Integer.toHexString(remaining);
301 							ourLog.debug("Sending chunk length: {}", remaining);
302 							byte[] bytesToSend = remChunkLength.getBytes("ISO-8859-1");
303 							ourLog.debug("Sending bytes: {}", bytesToSend);
304 							tempOs.write(bytesToSend);
305 							tempOs.write("\r\n".getBytes("ISO-8859-1"));
306 							tempOs.write(bytes, halfLength, remaining);
307 							tempOs.write("\r\n".getBytes("ISO-8859-1"));
308 
309 							tempOs.write("0\r\n\r\n\r\n".getBytes("ISO-8859-1"));
310 							tempOs.flush();
311 
312 						} else {
313 
314 							e.encodeToOutputStream(os);
315 
316 						}
317 
318 					} else {
319 						ourLog.trace("Socket reader has NO data");
320 						try {
321 							Thread.sleep(100);
322 						} catch (Exception e) {
323 							// ignore
324 						}
325 					}
326 
327 					if (myCloseAfterEachMessage) {
328 						ourLog.info("Closing incoming socket...");
329 						mySocket.close();
330 						break;
331 					}
332 
333 					if (myCloseNormallyAfterEachMessage) {
334 						ourLog.info("Closing incoming socket...");
335 						mySocket.close();
336 						break;
337 					}
338 					
339 				}
340 			} catch (Exception e) {
341 				ourLog.info("Failed!", e);
342 				fail(e.getMessage());
343 			}
344 
345 			ourLog.info("Shutting down socket reader");
346 
347 		}
348 
349 	}
350 
351 	public void setCloseNormallyWithHeaderAfterEachMessage() {
352 		myCloseNormallyAfterEachMessage=true;
353 	}
354 
355 }