View Javadoc
1   package ca.uhn.hl7v2.hoh.encoder;
2   
3   import java.io.ByteArrayOutputStream;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.net.SocketException;
7   import java.net.SocketTimeoutException;
8   import java.nio.charset.Charset;
9   import java.nio.charset.UnsupportedCharsetException;
10  import java.util.ArrayList;
11  import java.util.LinkedHashMap;
12  import java.util.List;
13  import java.util.Map;
14  import java.util.regex.Pattern;
15  
16  import ca.uhn.hl7v2.hoh.api.DecodeException;
17  import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException;
18  import ca.uhn.hl7v2.hoh.sign.SignatureFailureException;
19  import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException;
20  import ca.uhn.hl7v2.hoh.util.ByteUtils;
21  import ca.uhn.hl7v2.hoh.util.GZipUtils;
22  import ca.uhn.hl7v2.hoh.util.IOUtils;
23  import ca.uhn.hl7v2.hoh.util.StringUtils;
24  import ca.uhn.hl7v2.hoh.util.repackage.Base64;
25  
26  public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp {
27  
28  	private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+");
29  
30  	/**
31  	 * Default amount of time that the decoder will attempt to read before
32  	 * timing out and throwing an IOException (30000ms)
33  	 * 
34  	 * @see #setReadTimeout(long)
35  	 */
36  	public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
37  
38  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class);
39  
40  	private byte[] myBytes;
41  	private List<String> myConformanceProblems;
42  	private int myContentLength = -1;
43  	private String myContentType;
44  	private boolean myGzipCoding;
45  	private long myLastStartedReading;
46  	private long myReadTimeout = DEFAULT_READ_TIMEOUT;
47  	private String myResponseName;
48  	private Integer myResponseStatus;
49  	private TransferEncoding myTransferEncoding;
50  	private String mySignature;
51  	private EncodingStyle myEncodingStyle;
52  
53  	private boolean myConnectionCloseHeaderIsPresent;
54  
55  	private void addConformanceProblem(String theString) {
56  		ourLog.debug("Conformance problem detected: {}", theString);
57  		if (myConformanceProblems == null) {
58  			myConformanceProblems = new ArrayList<>();
59  		}
60  		myConformanceProblems.add(theString);
61  	}
62  
63  	protected abstract void authorize() throws AuthorizationFailureException;
64  
65  	public void decode() throws DecodeException, SignatureVerificationException {
66  		ourLog.trace("Entering decode()");
67  		
68  		verifyNotUsed();
69  
70  		decodeHeaders();
71  		authorize();
72  		decodeBody();
73  		verifySignature();
74  
75  		ourLog.trace("Exiting decode()");
76  	}
77  
78  	private void decodeBody() throws DecodeException {
79  		byte[] bytes = myBytes;
80  
81  		if (myGzipCoding) {
82  			ourLog.debug("Decoding message contents using GZIP encoding style");
83  			try {
84  				bytes = GZipUtils.uncompress(bytes);
85  			} catch (IOException e) {
86  				throw new DecodeException("Failed to uncompress GZip content", e);
87  			}
88  		}
89  
90  		Charset charset = getCharset();
91  		
92  		ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name());
93  		if (ourLog.isTraceEnabled()) {
94  			ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset));
95  		}
96  		
97  		String messageString = new String(bytes, charset);
98  		setMessage(messageString);
99  	}
100 
101 	private void decodeHeaders() throws DecodeException {
102 
103 		ourLog.trace("Header map contains: {}", getHeaders());
104 
105 		for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) {
106 			String nextHeader = nextEntry.getKey().toLowerCase();
107 			String nextValue = nextEntry.getValue();
108 
109 			ourLog.trace("Next header: {}={}", nextHeader, nextValue);
110 			
111 			if ("transfer-encoding".equals(nextHeader)) {
112 				if ("chunked".equalsIgnoreCase(nextValue)) {
113 					myTransferEncoding = TransferEncoding.CHUNKED;
114 					ourLog.trace("Found chunked transfer encoding");
115 				} else {
116 					throw new DecodeException("Unknown transfer encoding: " + nextValue);
117 				}
118 			} else if ("connection".equals(nextHeader)) {
119 				if ("close".equals(nextValue)) {
120 					myConnectionCloseHeaderIsPresent = true;
121 				}
122 			} else if ("content-length".equals(nextHeader)) {
123 				try {
124 					myContentLength = Integer.parseInt(nextValue);
125 					ourLog.trace("Found content length: {}", myContentLength);
126 				} catch (NumberFormatException e) {
127 					addConformanceProblem("Could not parse Content-Length header value: " + nextHeader);
128 				}
129 			} else if ("content-type".equals(nextHeader)) {
130 				int colonIndex = nextValue.indexOf(';');
131 				if (colonIndex == -1) {
132 					myContentType = nextValue.trim();
133 				} else {
134 					myContentType = nextValue.substring(0, colonIndex).trim();
135 					String charsetDef = nextValue.substring(colonIndex + 1).trim();
136 					if (charsetDef.startsWith("charset=")) {
137 						String charsetName = charsetDef.substring(8);
138 						Charset charset;
139 						try {
140 							charset = Charset.forName(charsetName);
141 						} catch (UnsupportedCharsetException e) {
142 							addConformanceProblem("Unsupported or invalid charset: " + charsetName);
143 							continue;
144 						}
145 						setCharset(charset);
146 					}
147 				}
148 
149 				myEncodingStyle = EncodingStyle.getEncodingStyleForContentType(myContentType);
150 				ourLog.trace("Found content type {} with resolves to encoding style {}", myContentType, myEncodingStyle);
151 
152 			} else if ("authorization".equals(nextHeader)) {
153 				int spaceIndex = nextValue.indexOf(' ');
154 				if (spaceIndex == -1) {
155 					throw new DecodeException("Invalid authorization header. No authorization style detected");
156 				}
157 				String type = nextValue.substring(0, spaceIndex);
158 				if ("basic".equalsIgnoreCase(type)) {
159 					String encodedCredentials = nextValue.substring(spaceIndex + 1);
160 					byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials);
161 					String credentialsString = new String(decodedCredentials, getDefaultCharset());
162 					int colonIndex = credentialsString.indexOf(':');
163 					if (colonIndex == -1) {
164 						setUsername(credentialsString);
165 					} else {
166 						setUsername(credentialsString.substring(0, colonIndex));
167 						setPassword(credentialsString.substring(colonIndex + 1));
168 					}
169 					
170 					ourLog.trace("Found authorization header with username: {}", getUsername());
171 					
172 				} else {
173 					addConformanceProblem("Invalid authorization type. Only basic authorization is supported.");
174 				}
175 				
176 			} else if ("content-encoding".equals(nextHeader)) {
177 				if (StringUtils.isNotBlank(nextValue)) {
178 					if ("gzip".equals(nextValue)) {
179 						myGzipCoding = true;
180 					} else {
181 						throw new DecodeException("Unknown Content-Encoding: " + nextValue);
182 					}
183 				}
184 				ourLog.trace("Found content coding: {}", nextValue);
185 			} else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) {
186 				ourLog.trace("Found signature: {}", nextValue);
187 				mySignature = nextValue;
188 			} else {
189 				ourLog.trace("Ignoring header {}={}", nextHeader, nextValue);
190 			}
191 
192 		}
193 
194 		ourLog.trace("Done processing headers");
195 
196 	}
197 
198 	/**
199 	 * Protected because this doesn't make sense for a sender
200 	 */
201 	protected boolean isConnectionCloseHeaderPresent() {
202 		return myConnectionCloseHeaderIsPresent;
203 	}
204 
205 	/**
206 	 * Returns the {@link EncodingStyle} associated with the incoming message,
207 	 * or <code>null</code>. This will be set automatically based on the value
208 	 * of the <code>Content-Type</code> header, and will be set to
209 	 * <code>null</code> if the content type is not provided, or if the content
210 	 * type does not correspond to an HL7 type.
211 	 * 
212 	 * @see {@link EncodingStyle} for a list of appropriate content types
213 	 */
214 	public EncodingStyle getEncodingStyle() {
215 		return myEncodingStyle;
216 	}
217 
218 	private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, IOException, SignatureVerificationException {
219 		decodeHeaders();
220 		authorize();
221 		if (myTransferEncoding == TransferEncoding.CHUNKED) {
222 			myBytes = readBytesChunked(theInputStream);
223 		} else {
224 			myBytes = readBytesNonChunked(theInputStream);
225 		}
226 
227 		decodeBody();
228 		
229 		if (getContentType() == null) {
230 			throw new DecodeException("Content-Type not specified");
231 		}
232 		if (getEncodingStyle() == null) {
233 			throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage());
234 		}
235 		
236 		verifySignature();
237 	}
238 
239 	private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException {
240 		ourLog.debug("Decoding message bytes using CHUNKED encoding style");
241 		byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
242 		ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE);
243 
244 		while (true) {
245 			String nextSize;
246 			try {
247 				nextSize = readLine(theInputStream);
248 			} catch (IOException e) {
249 				throw new DecodeException("Failed to decode CHUNKED encoding", e);
250 			}
251 
252 			ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize);
253 			
254 			if (nextSize.length() == 0) {
255 				break;
256 			}
257 
258 			int nextSizeInt;
259 			try {
260 				nextSizeInt = Integer.parseInt(nextSize, 16);
261 			} catch (NumberFormatException e) {
262 				throw new DecodeException("Failed to decode CHUNKED encoding", e);
263 			}
264 
265 			ourLog.debug("Next CHUNKED size: {}", nextSizeInt);
266 
267 			if (nextSizeInt < 0) {
268 				throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize);
269 			}
270 
271 			boolean trailing = false;
272 			if (nextSizeInt > 0) {
273 				int totalRead = 0;
274 				myLastStartedReading = System.currentTimeMillis();
275 				do {
276 					int nextRead = Math.min(nextSizeInt, byteBuffer.length);
277 					int bytesRead = theInputStream.read(byteBuffer, 0, nextRead);
278 					if (bytesRead == -1) {
279 						ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size());
280 						throw new DecodeException("Reached EOF while reading in message chunk");
281 					}
282 					if (bytesRead == 0) {
283 						pauseDuringTimedOutRead();
284 					}
285 					totalRead += bytesRead;
286 
287 					if (ourLog.isTraceEnabled()) {
288 						ourLog.trace("Read {} byte chunk: {}", bytesRead, new String(byteBuffer, 0, bytesRead));
289 					}else {
290 						ourLog.debug("Read {} byte chunk", bytesRead);
291 					}
292 					
293 					bos.write(byteBuffer, 0, bytesRead);
294 
295 				} while (totalRead < nextSizeInt);
296 			} else {
297 				trailing = true;
298 			}
299 
300 			// Try to read a trailing CRLF
301 			int nextChar;
302 			boolean had13 = false;
303 			boolean had10 = false;
304 			while (true) {
305 				try {
306 					nextChar = theInputStream.read();
307 					if (ourLog.isTraceEnabled()) {
308 						ourLog.trace("Read byte: " + (char)nextChar + " (" + nextChar + ")");
309 					}
310 				} catch (SocketTimeoutException e) {
311 					break;
312 				}
313 
314 				if (nextChar == -1) {
315 					break;
316 				} else if (nextChar == 13) {
317 					if (had13) {
318 						/* 
319 						 * This is an attempt to be tolerant of people using the wrong
320 						 * end of line sequence (it should be CRLF), as is the 
321 						 * had10 below 
322 						 */
323 						trailing = true;
324 					}
325 					had13 = true;
326 				} else if (nextChar == 10) {
327 					break;
328 				} else {
329 					break;
330 				}
331 			}
332 			
333 			if (trailing) {
334 				break;
335 			}
336 
337 		} // while
338 
339 		return bos.toByteArray();
340 	}
341 
342 	private void verifySignature() throws SignatureVerificationException, DecodeException {
343 		if (getSigner() != null && StringUtils.isBlank(mySignature)) {
344 			String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response";
345 			throw new SignatureVerificationException("No HL7 Signature found in " + mode);
346 		}
347 		if (getSigner() != null) {
348 			try {
349 				getSigner().verify(myBytes, mySignature);
350 			} catch (SignatureFailureException e) {
351 				throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e);
352 			}
353 		}
354 	}
355 
356 	public List<String> getConformanceProblems() {
357 		if (myConformanceProblems == null) {
358 			myConformanceProblems = new ArrayList<>();
359 		}
360 		return myConformanceProblems;
361 	}
362 
363 	/**
364 	 * @return Returns the content type associated with the message (e.g. application/hl7-v2)
365 	 */
366 	public String getContentType() {
367 		return myContentType;
368 	}
369 
370 	/**
371 	 * @return the responseName
372 	 */
373 	public String getResponseName() {
374 		return myResponseName;
375 	}
376 
377 	/**
378 	 * @return the responseStatus
379 	 */
380 	public Integer getResponseStatus() {
381 		return myResponseStatus;
382 	}
383 
384 	protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException;
385 
386 	private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException {
387 		ourLog.debug("Decoding message bytes using non-chunked encoding style");
388 
389 		int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE;
390 		ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
391 
392 		byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
393 		myLastStartedReading = System.currentTimeMillis();
394 		while ((myContentLength < 0 || bos.size() < myContentLength)) {
395 			if (myContentLength < 0) {
396 				try {
397 					if (theInputStream.available() <= 0) {
398 						ourLog.trace("No more bytes available");
399 						break;
400 					}
401 				} catch (IOException e) {
402 					ourLog.debug("Received IOException while calling inputStream#available()", e);
403 					throw e;
404 				}
405 			}
406 
407 			int max;
408 			if (myContentLength > 0) {
409 				max = myContentLength - bos.size();
410 				max = Math.min(max, buffer.length);
411 			} else {
412 				max = buffer.length;
413 			}
414 			
415 			try {
416 				int bytesRead = theInputStream.read(buffer, 0, max);
417 				myLastStartedReading = System.currentTimeMillis();
418 				if (bytesRead == -1) {
419 					ourLog.trace("Read end of stream");
420 					break;
421 				} else {
422 					if (ourLog.isTraceEnabled()) {
423 						ourLog.trace("Read {} bytes from stream:\n{}", bytesRead, ByteUtils.formatBytesForLogging(bytesRead, 0, buffer));
424 					}
425 				}
426 				bos.write(buffer, 0, bytesRead);
427 			} catch (SocketTimeoutException e) {
428 				long elapsed = System.currentTimeMillis() - myLastStartedReading;
429 				if (elapsed > myReadTimeout) {
430 					throw e;
431 				} else {
432 					ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout);
433 					try {
434 						Thread.sleep(100);
435 					} catch (InterruptedException e1) {
436 						// ignore
437 					}
438 				}
439 			} catch (IOException e) {
440 				ourLog.debug("Received IOException while calling inputStream#available()", e);
441 				throw e;
442 			}
443 		}
444 
445 		return bos.toByteArray();
446 	}
447 
448 	/**
449 	 * Read in the contents of the raw message from the input stream and decode
450 	 * entire the message. This method assumes that the headers have been
451 	 * provided using {@link #setHeaders(LinkedHashMap)}
452 	 * 
453 	 * @param theInputStream
454 	 *            The inputstream to read the raw message from
455 	 * @throws AuthorizationFailureException
456 	 *             If the authorization check fails. This will only be thrown if
457 	 *             this decoder is decoding a request message, and an
458 	 *             authorization callback has been provided, and the
459 	 *             authorization fails.
460 	 * @throws DecodeException
461 	 *             If the message can not be decoded for any reason
462 	 * @throws IOException
463 	 *             If there is a failure while reading from the inputstream
464 	 * @throws SignatureVerificationException
465 	 *             If the signature verification fails. This will only occur if
466 	 *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
467 	 *             has been provided.
468 	 */
469 	public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException {
470 		verifyNotUsed();
471 
472 		doReadContentsFromInputStreamAndDecode(theInputStream);
473 	}
474 
475 	protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException {
476 		ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream);
477 		String retVal = readLine(theInputStream, true);
478 		ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal);
479 		return retVal;
480 	}
481 
482 	/**
483 	 * Note that if {@link #setPath(String)} is called, this method will assume
484 	 * that the first line of the HTTP request has already been read from the
485 	 * input stream. If {@link #setHeaders(java.util.LinkedHashMap)} has been
486 	 * called, this method will assume that the HTTP headers have already been
487 	 * read from the input stream as well as the double-LF (ASCII-10) that
488 	 * proceeds the headers.
489 	 * 
490 	 * 
491 	 * @param theInputStream
492 	 *            The inputstream to read the raw message from
493 	 * @throws AuthorizationFailureException
494 	 *             If the authorization check fails. This will only be thrown if
495 	 *             this decoder is decoding a request message, and an
496 	 *             authorization callback has been provided, and the
497 	 *             authorization fails.
498 	 * @throws DecodeException
499 	 *             If the message can not be decoded for any reason
500 	 * @throws IOException
501 	 *             If there is a failure while reading from the inputstream
502 	 * @throws SignatureVerificationException
503 	 *             If the signature verification fails. This will only occur if
504 	 *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
505 	 *             has been provided.
506 	 */
507 	public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException {
508 		verifyNotUsed();
509 
510 		String actionLine = readActionLineAndDecode(theInputStream);
511 
512 		ourLog.debug("Read action line: {}", actionLine);
513 
514 		if (getHeaders() == null) {
515 			setHeaders(new LinkedHashMap<>());
516 
517 			while (true) {
518 				String nextLine = readLine(theInputStream);
519 				if (nextLine.length() == 0) {
520 					break;
521 				}
522 
523 				int colonIndex = nextLine.indexOf(':');
524 				if (colonIndex == -1) {
525 					throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine);
526 				}
527 
528 				String key = nextLine.substring(0, colonIndex);
529 				String value = nextLine.substring(colonIndex + 1).trim();
530 				
531 				ourLog.debug("Read header {}={}", key,value);
532 				
533 				getHeaders().put(key, value);
534 			}
535 		}
536 
537 		doReadContentsFromInputStreamAndDecode(theInputStream);
538 
539 	}
540 
541 	private String readLine(InputStream theInputStream) throws IOException {
542 		try {
543 			return readLine(theInputStream, false);
544 		} catch (NoMessageReceivedException e) {
545 			throw new Error("Threw a NoMessageReceivedException. This should not happen.", e);
546 		}
547 	}
548 
549 	private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException {
550 		
551 		myLastStartedReading = System.currentTimeMillis();
552 
553 		StringBuilder retVal = new StringBuilder();
554 		while (true) {
555 
556 			int b;
557 			try {
558 				b = theInputStream.read();
559 				if (ourLog.isTraceEnabled()) {
560 					ourLog.trace("Read byte: " + (char)b + " (" + b + ")");
561 				}
562 			} catch (SocketTimeoutException e) {
563 				if (retVal.length() == 0 && theFirstLine) {
564 					ourLog.trace("No message received, aborting readLine(InputStream, boolean)");
565 					throw new NoMessageReceivedException();
566 				}
567 				ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue");
568 				pauseDuringTimedOutRead();
569 				continue;
570 			}
571 
572 			if (b == 13) {
573 			} else if (b == 10) {
574 				break;
575 			} else if (b == -1) {
576 				ourLog.debug("Current read line is: {}", retVal);
577 				ourLog.info("Read -1 from input stream, closing it");
578 				theInputStream.close();
579 				if (retVal.length() == 0) {
580 					throw new SocketException("Received EOF from input stream");
581 				}
582 				break;
583 			} else if (b < ' ') {
584 			} else {
585 				retVal.append((char) b);
586 			}
587 		}
588 
589 		ourLog.debug("Current read line is: {}", retVal);
590 
591 		return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim();
592 	}
593 
594 	private void pauseDuringTimedOutRead() throws SocketTimeoutException {
595 		long elapsed = System.currentTimeMillis() - myLastStartedReading;
596 		if (elapsed > myReadTimeout) {
597 			ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout);
598 			throw new SocketTimeoutException();
599 		}
600 		try {
601 			Thread.sleep(100);
602 		} catch (InterruptedException e1) {
603 			// ignore
604 		}
605 	}
606 
607 	/**
608 	 * Sets the number of milliseconds that the decoder will attempt to read
609 	 * from an InputStream before timing out and throwing an exception
610 	 */
611 	public void setReadTimeout(long theReadTimeout) {
612 		myReadTimeout = theReadTimeout;
613 	}
614 
615 	/**
616 	 * @param theResponseName
617 	 *            the responseName to set
618 	 */
619 	public void setResponseName(String theResponseName) {
620 		myResponseName = theResponseName;
621 	}
622 
623 	/**
624 	 * @param theResponseStatus
625 	 *            the responseStatus to set
626 	 */
627 	public void setResponseStatus(Integer theResponseStatus) {
628 		myResponseStatus = theResponseStatus;
629 	}
630 
631 }