1 package ca.uhn.hl7v2.hoh.llp;
2
3 import static org.junit.Assert.fail;
4
5 import java.io.IOException;
6 import java.io.OutputStream;
7 import java.io.PushbackInputStream;
8 import java.net.InetAddress;
9 import java.net.InetSocketAddress;
10 import java.net.ServerSocket;
11 import java.net.Socket;
12 import java.net.SocketTimeoutException;
13 import java.util.Arrays;
14 import java.util.LinkedList;
15 import java.util.Map;
16 import java.util.concurrent.CountDownLatch;
17
18 import ca.uhn.hl7v2.hoh.api.IAuthorizationServerCallback;
19 import ca.uhn.hl7v2.hoh.encoder.EncodingStyle;
20 import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpRequestDecoder;
21 import ca.uhn.hl7v2.hoh.encoder.Hl7OverHttpResponseEncoder;
22 import ca.uhn.hl7v2.hoh.sockets.CustomCertificateTlsSocketFactory;
23 import ca.uhn.hl7v2.hoh.sockets.ISocketFactory;
24 import ca.uhn.hl7v2.hoh.sockets.StandardSocketFactory;
25 import ca.uhn.hl7v2.model.Message;
26 import ca.uhn.hl7v2.parser.GenericParser;
27
28 public class ServerSocketThreadForTesting extends Thread {
29 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ServerSocketThreadForTesting.class);
30
31 private boolean myCloseAfterEachMessage;
32 private int myConnectionCount = 0;
33 private String myContentType;
34 private boolean myDone;
35 private EncodingStyle myEncoding;
36 private boolean myGZipResponse;
37 private CountDownLatch myLatch = new CountDownLatch(1);
38 private String myMessage;
39 private int myPort;
40 private Message myReply;
41 private String myRequestUri;
42 private IAuthorizationServerCallback myServerAuthCallback;
43 private ServerSocket myServerSocket;
44 private boolean mySimulateOneSecondPauseInChunkedEncoding;
45 private ISocketFactory mySocketFactory;
46 private LinkedList<Long> myResponseDelays = new LinkedList<Long>();
47
48 private boolean myCloseNormallyAfterEachMessage;
49
50 public void setResponseDelays(Long... theResponseDelays) {
51 myResponseDelays = new LinkedList<Long>(Arrays.asList(theResponseDelays));
52 }
53
54 public ServerSocketThreadForTesting(int thePort) {
55 myPort = thePort;
56 mySocketFactory = new StandardSocketFactory();
57 }
58
59 public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback) {
60 myPort = thePort;
61 myServerAuthCallback = theServerAuthCallback;
62 mySocketFactory = new StandardSocketFactory();
63 }
64
65 public ServerSocketThreadForTesting(int thePort, IAuthorizationServerCallback theServerAuthCallback, ISocketFactory theSocketFactory) {
66 myPort = thePort;
67 myServerAuthCallback = theServerAuthCallback;
68 mySocketFactory = theSocketFactory;
69 }
70
71 public void done() {
72 myDone = true;
73 }
74
75
76
77
78 public int getConnectionCount() {
79 return myConnectionCount;
80 }
81
82
83
84
85 public String getContentType() {
86 return myContentType;
87 }
88
89
90
91
92 public EncodingStyle getEncoding() {
93 return myEncoding;
94 }
95
96
97
98
99 public CountDownLatch getLatch() {
100 return myLatch;
101 }
102
103
104
105
106 public String getMessage() {
107 return myMessage;
108 }
109
110
111
112
113 public Message getReply() {
114 return myReply;
115 }
116
117 public String getRequestUri() {
118 return myRequestUri;
119 }
120
121 @Override
122 public void run() {
123
124 ourLog.info("Starting server on {}", myPort);
125
126 myConnectionCount = 0;
127
128 Exception ex = null;
129 try {
130 myServerSocket = mySocketFactory.createServerSocket();
131 myServerSocket.bind(new InetSocketAddress((InetAddress) null, myPort), 50);
132
133
134 myServerSocket.setSoTimeout(1000);
135
136 while (!myDone) {
137 try {
138 ourLog.trace("Going to accept()");
139 Socket newSocket = myServerSocket.accept();
140 myConnectionCount++;
141 newSocket.setSoTimeout(1000);
142 ourLog.info("New socket: {}", newSocket.getInetAddress().toString());
143 TestSocketThread t = new TestSocketThread(newSocket);
144 t.start();
145 } catch (SocketTimeoutException e) {
146 ourLog.trace("No new connection");
147 }
148 myLatch.countDown();
149 }
150
151 } catch (Exception e) {
152 ourLog.error("Weird exception!", e);
153 ex = e;
154 }
155
156 ourLog.info("Shutting down, done is {}", myDone);
157
158 try {
159 myServerSocket.close();
160 } catch (IOException e) {
161 ourLog.error("Failed to close", e);
162 }
163
164 if (ex != null) {
165 fail(ex.getMessage());
166 }
167
168 }
169
170 public void setCloseUnexpectedlyAfterEachMessage() {
171 myCloseAfterEachMessage = true;
172 }
173
174
175
176
177
178 public void setGZipResponse(boolean theGZipResponse) {
179 myGZipResponse = theGZipResponse;
180 }
181
182
183
184
185
186 public void setMessage(String theMessage) {
187 myMessage = theMessage;
188 }
189
190
191
192
193
194 public void setReply(Message theReply) {
195 myReply = theReply;
196 }
197
198 public void setServerSockewtFactory(CustomCertificateTlsSocketFactory theServerSocketFactory) {
199 mySocketFactory = theServerSocketFactory;
200 }
201
202 public void setSimulateOneSecondPauseInChunkedEncoding(boolean theB) {
203 mySimulateOneSecondPauseInChunkedEncoding = theB;
204 }
205
206 public class TestSocketThread extends Thread {
207
208 private Socket mySocket;
209
210 public TestSocketThread(Socket theSocket) {
211 mySocket = theSocket;
212 }
213
214 @Override
215 public void run() {
216
217 ourLog.info("Starting socket reader");
218 try {
219 PushbackInputStream is = new PushbackInputStream(mySocket.getInputStream());
220 OutputStream os = mySocket.getOutputStream();
221 while (!myDone) {
222
223 try {
224 int nextChar = is.read();
225 ourLog.info("Read: " + nextChar);
226 if (nextChar > 0) {
227 is.unread(nextChar);
228 }
229
230 } catch (SocketTimeoutException e) {
231
232 }
233
234 if (is.available() > 0) {
235 ourLog.info("Socket reader has data");
236
237
238
239
240
241
242 Hl7OverHttpRequestDecoder d = new Hl7OverHttpRequestDecoder();
243 d.setAuthorizationCallback(myServerAuthCallback);
244 d.readHeadersAndContentsFromInputStreamAndDecode(is);
245 myMessage = d.getMessage();
246 myContentType = d.getContentType();
247 myEncoding = d.getEncodingStyle();
248 myRequestUri = d.getPathRaw();
249
250 Message parsedMessage = GenericParser.getInstanceWithNoValidation().parse(myMessage);
251 myReply = parsedMessage.generateACK();
252
253 synchronized (myResponseDelays) {
254 if (myResponseDelays.size() > 0) {
255 Long millis = myResponseDelays.removeFirst();
256 ourLog.info("Sleeping for {}ms", millis);
257 Thread.sleep(millis);
258 }
259 }
260
261 Hl7OverHttpResponseEncoder e = new Hl7OverHttpResponseEncoder();
262 e.setMessage(myReply.encode());
263 e.setGzipData(myGZipResponse);
264
265 if (myCloseNormallyAfterEachMessage) {
266 e.setAddConnectionCloseHeader(true);
267 }
268
269 if (mySimulateOneSecondPauseInChunkedEncoding) {
270 e.encode();
271 e.getHeaders().remove("Content-Length");
272 e.getHeaders().put("Transfer-Encoding", "chunked");
273
274 OutputStream tempOs = os;
275 tempOs.write("HTTP/1.1 200 OK\r\n".getBytes("ISO-8859-1"));
276 for (Map.Entry<String, String> next : e.getHeaders().entrySet()) {
277 String nextHeader = next.getKey() + ": " + next.getValue();
278 ourLog.debug("Sending response header - " + nextHeader);
279 tempOs.write((nextHeader + "\r\n").getBytes("ISO-8859-1"));
280 }
281
282 tempOs.write("\r\n".getBytes("ISO-8859-1"));
283
284 byte[] bytes = e.getData();
285 int halfLength = bytes.length / 2;
286
287 String chunkLength = Integer.toHexString(halfLength);
288 ourLog.debug("Sending chunk length: {}", halfLength);
289
290 tempOs.write(chunkLength.getBytes("ISO-8859-1"));
291 tempOs.write("\r\n".getBytes("ISO-8859-1"));
292 tempOs.write(bytes, 0, halfLength);
293 tempOs.write("\r\n".getBytes("ISO-8859-1"));
294 tempOs.flush();
295
296 Thread.sleep(1000);
297
298 int remaining = bytes.length - halfLength;
299
300 String remChunkLength = Integer.toHexString(remaining);
301 ourLog.debug("Sending chunk length: {}", remaining);
302 byte[] bytesToSend = remChunkLength.getBytes("ISO-8859-1");
303 ourLog.debug("Sending bytes: {}", bytesToSend);
304 tempOs.write(bytesToSend);
305 tempOs.write("\r\n".getBytes("ISO-8859-1"));
306 tempOs.write(bytes, halfLength, remaining);
307 tempOs.write("\r\n".getBytes("ISO-8859-1"));
308
309 tempOs.write("0\r\n\r\n\r\n".getBytes("ISO-8859-1"));
310 tempOs.flush();
311
312 } else {
313
314 e.encodeToOutputStream(os);
315
316 }
317
318 } else {
319 ourLog.trace("Socket reader has NO data");
320 try {
321 Thread.sleep(100);
322 } catch (Exception e) {
323
324 }
325 }
326
327 if (myCloseAfterEachMessage) {
328 ourLog.info("Closing incoming socket...");
329 mySocket.close();
330 break;
331 }
332
333 if (myCloseNormallyAfterEachMessage) {
334 ourLog.info("Closing incoming socket...");
335 mySocket.close();
336 break;
337 }
338
339 }
340 } catch (Exception e) {
341 ourLog.info("Failed!", e);
342 fail(e.getMessage());
343 }
344
345 ourLog.info("Shutting down socket reader");
346
347 }
348
349 }
350
351 public void setCloseNormallyWithHeaderAfterEachMessage() {
352 myCloseNormallyAfterEachMessage=true;
353 }
354
355 }