View Javadoc

1   package ca.uhn.hl7v2.hoh.encoder;
2   
3   import java.io.ByteArrayOutputStream;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.net.SocketException;
7   import java.net.SocketTimeoutException;
8   import java.nio.charset.Charset;
9   import java.nio.charset.UnsupportedCharsetException;
10  import java.util.ArrayList;
11  import java.util.LinkedHashMap;
12  import java.util.List;
13  import java.util.Map;
14  import java.util.regex.Pattern;
15  
16  import ca.uhn.hl7v2.hoh.api.DecodeException;
17  import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException;
18  import ca.uhn.hl7v2.hoh.sign.SignatureFailureException;
19  import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException;
20  import ca.uhn.hl7v2.hoh.util.ByteUtils;
21  import ca.uhn.hl7v2.hoh.util.GZipUtils;
22  import ca.uhn.hl7v2.hoh.util.IOUtils;
23  import ca.uhn.hl7v2.hoh.util.StringUtils;
24  import ca.uhn.hl7v2.hoh.util.repackage.Base64;
25  
26  public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp {
27  
28  	private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+");
29  
30  	/**
31  	 * Default amount of time that the decoder will attempt to read before
32  	 * timing out and throwing an IOException (30000ms)
33  	 * 
34  	 * @see #setReadTimeout(long)
35  	 */
36  	public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
37  
38  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class);
39  
40  	private byte[] myBytes;
41  	private List<String> myConformanceProblems;
42  	private int myContentLength = -1;
43  	private String myContentType;
44  	private boolean myGzipCoding;
45  	private long myLastStartedReading;
46  	private long myReadTimeout = DEFAULT_READ_TIMEOUT;
47  	private String myResponseName;
48  	private Integer myResponseStatus;
49  	private TransferEncoding myTransferEncoding;
50  	private String mySignature;
51  	private EncodingStyle myEncodingStyle;
52  
53  	private boolean myConnectionCloseHeaderIsPresent;
54  
55  	private void addConformanceProblem(String theString) {
56  		ourLog.debug("Conformance problem detected: {}", theString);
57  		if (myConformanceProblems == null) {
58  			myConformanceProblems = new ArrayList<String>();
59  		}
60  		myConformanceProblems.add(theString);
61  	}
62  
63  	protected abstract void authorize() throws AuthorizationFailureException;
64  
65  	public void decode() throws DecodeException, SignatureVerificationException {
66  		ourLog.trace("Entering decode()");
67  		
68  		verifyNotUsed();
69  
70  		decodeHeaders();
71  		authorize();
72  		decodeBody();
73  		verifySignature();
74  
75  		ourLog.trace("Exiting decode()");
76  	}
77  
78  	private void decodeBody() throws DecodeException {
79  		byte[] bytes = myBytes;
80  
81  		if (myGzipCoding) {
82  			ourLog.debug("Decoding message contents using GZIP encoding style");
83  			try {
84  				bytes = GZipUtils.uncompress(bytes);
85  			} catch (IOException e) {
86  				throw new DecodeException("Failed to uncompress GZip content", e);
87  			}
88  		}
89  
90  		Charset charset = getCharset();
91  		
92  		ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name());
93  		if (ourLog.isTraceEnabled()) {
94  			ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset));
95  		}
96  		
97  		String messageString = new String(bytes, charset);
98  		setMessage(messageString);
99  	}
100 
101 	private void decodeHeaders() throws DecodeException {
102 
103 		ourLog.trace("Header map contains: {}", getHeaders());
104 
105 		for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) {
106 			String nextHeader = nextEntry.getKey().toLowerCase();
107 			String nextValue = nextEntry.getValue();
108 
109 			ourLog.trace("Next header: {}={}", nextHeader, nextValue);
110 			
111 			if ("transfer-encoding".equals(nextHeader)) {
112 				if ("chunked".equalsIgnoreCase(nextValue)) {
113 					myTransferEncoding = TransferEncoding.CHUNKED;
114 					ourLog.trace("Found chunked transfer encoding");
115 				} else {
116 					throw new DecodeException("Unknown transfer encoding: " + nextValue);
117 				}
118 			} else if ("connection".equals(nextHeader)) {
119 				if ("close".equals(nextValue)) {
120 					myConnectionCloseHeaderIsPresent = true;
121 				}
122 			} else if ("content-length".equals(nextHeader)) {
123 				try {
124 					myContentLength = Integer.parseInt(nextValue);
125 					ourLog.trace("Found content length: {}", myContentLength);
126 				} catch (NumberFormatException e) {
127 					addConformanceProblem("Could not parse Content-Length header value: " + nextHeader);
128 				}
129 			} else if ("content-type".equals(nextHeader)) {
130 				int colonIndex = nextValue.indexOf(';');
131 				if (colonIndex == -1) {
132 					myContentType = nextValue.trim();
133 				} else {
134 					myContentType = nextValue.substring(0, colonIndex).trim();
135 					String charsetDef = nextValue.substring(colonIndex + 1).trim();
136 					if (charsetDef.startsWith("charset=")) {
137 						String charsetName = charsetDef.substring(8);
138 						Charset charset;
139 						try {
140 							charset = Charset.forName(charsetName);
141 						} catch (UnsupportedCharsetException e) {
142 							addConformanceProblem("Unsupported or invalid charset: " + charsetName);
143 							continue;
144 						}
145 						setCharset(charset);
146 					}
147 				}
148 
149 				myEncodingStyle = EncodingStyle.getEncodingStyleForContentType(myContentType);
150 				ourLog.trace("Found content type {} with resolves to encoding style {}", myContentType, myEncodingStyle);
151 
152 			} else if ("authorization".equals(nextHeader)) {
153 				int spaceIndex = nextValue.indexOf(' ');
154 				if (spaceIndex == -1) {
155 					throw new DecodeException("Invalid authorization header. No authorization style detected");
156 				}
157 				String type = nextValue.substring(0, spaceIndex);
158 				if ("basic".equalsIgnoreCase(type)) {
159 					String encodedCredentials = nextValue.substring(spaceIndex + 1);
160 					byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials);
161 					String credentialsString = new String(decodedCredentials, getDefaultCharset());
162 					int colonIndex = credentialsString.indexOf(':');
163 					if (colonIndex == -1) {
164 						setUsername(credentialsString);
165 					} else {
166 						setUsername(credentialsString.substring(0, colonIndex));
167 						setPassword(credentialsString.substring(colonIndex + 1));
168 					}
169 					
170 					ourLog.trace("Found authorization header with username: {}", getUsername());
171 					
172 				} else {
173 					addConformanceProblem("Invalid authorization type. Only basic authorization is supported.");
174 				}
175 				
176 			} else if ("content-encoding".equals(nextHeader)) {
177 				if (StringUtils.isNotBlank(nextValue)) {
178 					if ("gzip".equals(nextValue)) {
179 						myGzipCoding = true;
180 					} else {
181 						throw new DecodeException("Unknown Content-Encoding: " + nextValue);
182 					}
183 				}
184 				ourLog.trace("Found content coding: {}", nextValue);
185 			} else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) {
186 				ourLog.trace("Found signature: {}", nextValue);
187 				mySignature = nextValue;
188 			} else {
189 				ourLog.trace("Ignoring header {}={}", nextHeader, nextValue);
190 			}
191 
192 		}
193 
194 		ourLog.trace("Done processing headers");
195 
196 	}
197 
198 	/**
199 	 * Protected because this doesn't make sense for a sender
200 	 */
201 	protected boolean isConnectionCloseHeaderPresent() {
202 		return myConnectionCloseHeaderIsPresent;
203 	}
204 
205 	/**
206 	 * Returns the {@link EncodingStyle} associated with the incoming message,
207 	 * or <code>null</code>. This will be set automatically based on the value
208 	 * of the <code>Content-Type</code> header, and will be set to
209 	 * <code>null</code> if the content type is not provided, or if the content
210 	 * type does not correspond to an HL7 type.
211 	 * 
212 	 * @see {@link EncodingStyle} for a list of appropriate content types
213 	 */
214 	public EncodingStyle getEncodingStyle() {
215 		return myEncodingStyle;
216 	}
217 
218 	private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, AuthorizationFailureException, IOException, SignatureVerificationException {
219 		decodeHeaders();
220 		authorize();
221 		if (myTransferEncoding == TransferEncoding.CHUNKED) {
222 			myBytes = readBytesChunked(theInputStream);
223 		} else {
224 			myBytes = readBytesNonChunked(theInputStream);
225 		}
226 
227 		decodeBody();
228 		
229 		if (getContentType() == null) {
230 			throw new DecodeException("Content-Type not specified");
231 		}
232 		if (getEncodingStyle() == null) {
233 			throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage());
234 		}
235 		
236 		verifySignature();
237 	}
238 
239 	private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException {
240 		ourLog.debug("Decoding message bytes using CHUNKED encoding style");
241 		byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
242 		ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE);
243 
244 		while (true) {
245 			String nextSize;
246 			try {
247 				nextSize = readLine(theInputStream);
248 			} catch (IOException e) {
249 				throw new DecodeException("Failed to decode CHUNKED encoding", e);
250 			}
251 
252 			ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize);
253 			
254 			if (nextSize.length() == 0) {
255 				break;
256 			}
257 
258 			int nextSizeInt;
259 			try {
260 				nextSizeInt = Integer.parseInt(nextSize, 16);
261 			} catch (NumberFormatException e) {
262 				throw new DecodeException("Failed to decode CHUNKED encoding", e);
263 			}
264 
265 			ourLog.debug("Next CHUNKED size: {}", nextSizeInt);
266 
267 			if (nextSizeInt < 0) {
268 				throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize);
269 			}
270 
271 			boolean trailing = false;
272 			if (nextSizeInt > 0) {
273 				int totalRead = 0;
274 				myLastStartedReading = System.currentTimeMillis();
275 				do {
276 					int nextRead = Math.min(nextSizeInt, byteBuffer.length);
277 					int bytesRead = theInputStream.read(byteBuffer, 0, nextRead);
278 					if (bytesRead == -1) {
279 						ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size());
280 						throw new DecodeException("Reached EOF while reading in message chunk");
281 					}
282 					if (bytesRead == 0 && totalRead < nextSizeInt) {
283 						pauseDuringTimedOutRead();
284 					}
285 					totalRead += bytesRead;
286 
287 					if (ourLog.isTraceEnabled()) {
288 						ourLog.trace("Read {} byte chunk: {}", bytesRead, new String(byteBuffer, 0, bytesRead));
289 					}else {
290 						ourLog.debug("Read {} byte chunk", bytesRead);
291 					}
292 					
293 					bos.write(byteBuffer, 0, bytesRead);
294 
295 				} while (totalRead < nextSizeInt);
296 			} else {
297 				trailing = true;
298 			}
299 
300 			// Try to read a trailing CRLF
301 			int nextChar;
302 			boolean had13 = false;
303 			boolean had10 = false;
304 			while (true) {
305 				try {
306 					nextChar = theInputStream.read();
307 					if (ourLog.isTraceEnabled()) {
308 						ourLog.trace("Read byte: " + (char)nextChar + " (" + nextChar + ")");
309 					}
310 				} catch (SocketTimeoutException e) {
311 					break;
312 				}
313 
314 				if (nextChar == -1) {
315 					break;
316 				} else if (nextChar == 13) {
317 					if (had13) {
318 						/* 
319 						 * This is an attempt to be tolerant of people using the wrong
320 						 * end of line sequence (it should be CRLF), as is the 
321 						 * had10 below 
322 						 */
323 						trailing = true;
324 					}
325 					had13 = true;
326 					continue;
327 				} else if (nextChar == 10) {
328 					if (had10) {
329 						trailing = true;
330 					}
331 					break;
332 				} else {
333 					break;
334 				}
335 			}
336 			
337 			if (trailing) {
338 				break;
339 			}
340 
341 		} // while
342 
343 		return bos.toByteArray();
344 	}
345 
346 	private void verifySignature() throws SignatureVerificationException, DecodeException {
347 		if (getSigner() != null && StringUtils.isBlank(mySignature)) {
348 			String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response";
349 			throw new SignatureVerificationException("No HL7 Signature found in " + mode);
350 		}
351 		if (getSigner() != null) {
352 			try {
353 				getSigner().verify(myBytes, mySignature);
354 			} catch (SignatureFailureException e) {
355 				throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e);
356 			}
357 		}
358 	}
359 
360 	public List<String> getConformanceProblems() {
361 		if (myConformanceProblems == null) {
362 			myConformanceProblems = new ArrayList<String>();
363 		}
364 		return myConformanceProblems;
365 	}
366 
367 	/**
368 	 * @return Returns the content type associated with the message (e.g. application/hl7-v2)
369 	 */
370 	public String getContentType() {
371 		return myContentType;
372 	}
373 
374 	/**
375 	 * @return the responseName
376 	 */
377 	public String getResponseName() {
378 		return myResponseName;
379 	}
380 
381 	/**
382 	 * @return the responseStatus
383 	 */
384 	public Integer getResponseStatus() {
385 		return myResponseStatus;
386 	}
387 
388 	protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException;
389 
390 	private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException {
391 		ourLog.debug("Decoding message bytes using non-chunked encoding style");
392 
393 		int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE;
394 		ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
395 
396 		byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
397 		myLastStartedReading = System.currentTimeMillis();
398 		while ((myContentLength < 0 || bos.size() < myContentLength)) {
399 			if (myContentLength < 0) {
400 				try {
401 					if (theInputStream.available() <= 0) {
402 						ourLog.trace("No more bytes available");
403 						break;
404 					}
405 				} catch (IOException e) {
406 					ourLog.debug("Received IOException while calling inputStream#available()", e);
407 					throw e;
408 				}
409 			}
410 
411 			int max;
412 			if (myContentLength > 0) {
413 				max = myContentLength - bos.size();
414 				max = Math.min(max, buffer.length);
415 			} else {
416 				max = buffer.length;
417 			}
418 			
419 			try {
420 				int bytesRead = theInputStream.read(buffer, 0, max);
421 				myLastStartedReading = System.currentTimeMillis();
422 				if (bytesRead == -1) {
423 					ourLog.trace("Read end of stream");
424 					break;
425 				} else {
426 					if (ourLog.isTraceEnabled()) {
427 						ourLog.trace("Read {} bytes from stream:\n{}", bytesRead, ByteUtils.formatBytesForLogging(bytesRead, 0, buffer));
428 					}
429 				}
430 				bos.write(buffer, 0, bytesRead);
431 			} catch (SocketTimeoutException e) {
432 				long elapsed = System.currentTimeMillis() - myLastStartedReading;
433 				if (elapsed > myReadTimeout) {
434 					throw e;
435 				} else {
436 					ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout);
437 					try {
438 						Thread.sleep(100);
439 					} catch (InterruptedException e1) {
440 						// ignore
441 					}
442 				}
443 			} catch (IOException e) {
444 				ourLog.debug("Received IOException while calling inputStream#available()", e);
445 				throw e;
446 			}
447 		}
448 
449 		return bos.toByteArray();
450 	}
451 
452 	/**
453 	 * Read in the contents of the raw message from the input stream and decode
454 	 * entire the message. This method assumes that the headers have been
455 	 * provided using {@link #setHeaders(LinkedHashMap)}
456 	 * 
457 	 * @param theInputStream
458 	 *            The inputstream to read the raw message from
459 	 * @throws AuthorizationFailureException
460 	 *             If the authorization check fails. This will only be thrown if
461 	 *             this decoder is decoding a request message, and an
462 	 *             authorization callback has been provided, and the
463 	 *             authorization fails.
464 	 * @throws DecodeException
465 	 *             If the message can not be decoded for any reason
466 	 * @throws IOException
467 	 *             If there is a failure while reading from the inputstream
468 	 * @throws SignatureVerificationException
469 	 *             If the signature verification fails. This will only occur if
470 	 *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
471 	 *             has been provided.
472 	 */
473 	public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException {
474 		verifyNotUsed();
475 
476 		doReadContentsFromInputStreamAndDecode(theInputStream);
477 	}
478 
479 	protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException {
480 		ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream);
481 		String retVal = readLine(theInputStream, true);
482 		ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal);
483 		return retVal;
484 	}
485 
486 	/**
487 	 * Note that if {@link #setPath(String)} is called, this method will assume
488 	 * that the first line of the HTTP request has already been read from the
489 	 * input stream. If {@link #setHeaders(java.util.LinkedHashMap)} has been
490 	 * called, this method will assume that the HTTP headers have already been
491 	 * read from the input stream as well as the double-LF (ASCII-10) that
492 	 * proceeds the headers.
493 	 * 
494 	 * 
495 	 * @param theInputStream
496 	 *            The inputstream to read the raw message from
497 	 * @throws AuthorizationFailureException
498 	 *             If the authorization check fails. This will only be thrown if
499 	 *             this decoder is decoding a request message, and an
500 	 *             authorization callback has been provided, and the
501 	 *             authorization fails.
502 	 * @throws DecodeException
503 	 *             If the message can not be decoded for any reason
504 	 * @throws IOException
505 	 *             If there is a failure while reading from the inputstream
506 	 * @throws SignatureVerificationException
507 	 *             If the signature verification fails. This will only occur if
508 	 *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
509 	 *             has been provided.
510 	 */
511 	public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException {
512 		verifyNotUsed();
513 
514 		String actionLine = readActionLineAndDecode(theInputStream);
515 
516 		ourLog.debug("Read action line: {}", actionLine);
517 
518 		if (getHeaders() == null) {
519 			setHeaders(new LinkedHashMap<String, String>());
520 
521 			while (true) {
522 				String nextLine = readLine(theInputStream);
523 				if (nextLine.length() == 0) {
524 					break;
525 				}
526 
527 				int colonIndex = nextLine.indexOf(':');
528 				if (colonIndex == -1) {
529 					throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine);
530 				}
531 
532 				String key = nextLine.substring(0, colonIndex);
533 				String value = nextLine.substring(colonIndex + 1).trim();
534 				
535 				ourLog.debug("Read header {}={}", key,value);
536 				
537 				getHeaders().put(key, value);
538 			}
539 		}
540 
541 		doReadContentsFromInputStreamAndDecode(theInputStream);
542 
543 	}
544 
545 	private String readLine(InputStream theInputStream) throws IOException {
546 		try {
547 			return readLine(theInputStream, false);
548 		} catch (NoMessageReceivedException e) {
549 			throw new Error("Threw a NoMessageReceivedException. This should not happen.", e);
550 		}
551 	}
552 
553 	private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException {
554 		
555 		myLastStartedReading = System.currentTimeMillis();
556 
557 		StringBuilder retVal = new StringBuilder();
558 		while (true) {
559 
560 			int b;
561 			try {
562 				b = theInputStream.read();
563 				if (ourLog.isTraceEnabled()) {
564 					ourLog.trace("Read byte: " + (char)b + " (" + b + ")");
565 				}
566 			} catch (SocketTimeoutException e) {
567 				if (retVal.length() == 0 && theFirstLine) {
568 					ourLog.trace("No message received, aborting readLine(InputStream, boolean)");
569 					throw new NoMessageReceivedException();
570 				}
571 				ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue");
572 				pauseDuringTimedOutRead();
573 				continue;
574 			}
575 
576 			if (b == 13) {
577 				continue;
578 			} else if (b == 10) {
579 				break;
580 			} else if (b == -1) {
581 				ourLog.debug("Current read line is: {}", retVal);
582 				ourLog.info("Read -1 from input stream, closing it");
583 				theInputStream.close();
584 				if (retVal.length() == 0) {
585 					throw new SocketException("Received EOF from input stream");
586 				}
587 				break;
588 			} else if (b < ' ') {
589 				continue;
590 			} else {
591 				retVal.append((char) b);
592 			}
593 		}
594 
595 		ourLog.debug("Current read line is: {}", retVal);
596 
597 		return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim();
598 	}
599 
600 	private void pauseDuringTimedOutRead() throws SocketTimeoutException {
601 		long elapsed = System.currentTimeMillis() - myLastStartedReading;
602 		if (elapsed > myReadTimeout) {
603 			ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout);
604 			throw new SocketTimeoutException();
605 		}
606 		try {
607 			Thread.sleep(100);
608 		} catch (InterruptedException e1) {
609 			// ignore
610 		}
611 	}
612 
613 	/**
614 	 * Sets the number of milliseconds that the decoder will attempt to read
615 	 * from an InputStream before timing out and throwing an exception
616 	 */
617 	public void setReadTimeout(long theReadTimeout) {
618 		myReadTimeout = theReadTimeout;
619 	}
620 
621 	/**
622 	 * @param theResponseName
623 	 *            the responseName to set
624 	 */
625 	public void setResponseName(String theResponseName) {
626 		myResponseName = theResponseName;
627 	}
628 
629 	/**
630 	 * @param theResponseStatus
631 	 *            the responseStatus to set
632 	 */
633 	public void setResponseStatus(Integer theResponseStatus) {
634 		myResponseStatus = theResponseStatus;
635 	}
636 
637 }