001package ca.uhn.hl7v2.hoh.encoder; 002 003import java.io.ByteArrayOutputStream; 004import java.io.IOException; 005import java.io.InputStream; 006import java.net.SocketException; 007import java.net.SocketTimeoutException; 008import java.nio.charset.Charset; 009import java.nio.charset.UnsupportedCharsetException; 010import java.util.ArrayList; 011import java.util.LinkedHashMap; 012import java.util.List; 013import java.util.Map; 014import java.util.regex.Pattern; 015 016import ca.uhn.hl7v2.hoh.api.DecodeException; 017import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException; 018import ca.uhn.hl7v2.hoh.sign.SignatureFailureException; 019import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException; 020import ca.uhn.hl7v2.hoh.util.ByteUtils; 021import ca.uhn.hl7v2.hoh.util.GZipUtils; 022import ca.uhn.hl7v2.hoh.util.IOUtils; 023import ca.uhn.hl7v2.hoh.util.StringUtils; 024import ca.uhn.hl7v2.hoh.util.repackage.Base64; 025 026public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp { 027 028 private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+"); 029 030 /** 031 * Default amount of time that the decoder will attempt to read before 032 * timing out and throwing an IOException (30000ms) 033 * 034 * @see #setReadTimeout(long) 035 */ 036 public static final int DEFAULT_READ_TIMEOUT = 30 * 1000; 037 038 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class); 039 040 private byte[] myBytes; 041 private List<String> myConformanceProblems; 042 private int myContentLength = -1; 043 private String myContentType; 044 private boolean myGzipCoding; 045 private long myLastStartedReading; 046 private long myReadTimeout = DEFAULT_READ_TIMEOUT; 047 private String myResponseName; 048 private Integer myResponseStatus; 049 private TransferEncoding myTransferEncoding; 050 private String mySignature; 051 private EncodingStyle myEncodingStyle; 052 053 private boolean myConnectionCloseHeaderIsPresent; 054 055 private void addConformanceProblem(String theString) { 056 ourLog.debug("Conformance problem detected: {}", theString); 057 if (myConformanceProblems == null) { 058 myConformanceProblems = new ArrayList<String>(); 059 } 060 myConformanceProblems.add(theString); 061 } 062 063 protected abstract void authorize() throws AuthorizationFailureException; 064 065 public void decode() throws DecodeException, SignatureVerificationException { 066 ourLog.trace("Entering decode()"); 067 068 verifyNotUsed(); 069 070 decodeHeaders(); 071 authorize(); 072 decodeBody(); 073 verifySignature(); 074 075 ourLog.trace("Exiting decode()"); 076 } 077 078 private void decodeBody() throws DecodeException { 079 byte[] bytes = myBytes; 080 081 if (myGzipCoding) { 082 ourLog.debug("Decoding message contents using GZIP encoding style"); 083 try { 084 bytes = GZipUtils.uncompress(bytes); 085 } catch (IOException e) { 086 throw new DecodeException("Failed to uncompress GZip content", e); 087 } 088 } 089 090 Charset charset = getCharset(); 091 092 ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name()); 093 if (ourLog.isTraceEnabled()) { 094 ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset)); 095 } 096 097 String messageString = new String(bytes, charset); 098 setMessage(messageString); 099 } 100 101 private void decodeHeaders() throws DecodeException { 102 103 ourLog.trace("Header map contains: {}", getHeaders()); 104 105 for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) { 106 String nextHeader = nextEntry.getKey().toLowerCase(); 107 String nextValue = nextEntry.getValue(); 108 109 ourLog.trace("Next header: {}={}", nextHeader, nextValue); 110 111 if ("transfer-encoding".equals(nextHeader)) { 112 if ("chunked".equalsIgnoreCase(nextValue)) { 113 myTransferEncoding = TransferEncoding.CHUNKED; 114 ourLog.trace("Found chunked transfer encoding"); 115 } else { 116 throw new DecodeException("Unknown transfer encoding: " + nextValue); 117 } 118 } else if ("connection".equals(nextHeader)) { 119 if ("close".equals(nextValue)) { 120 myConnectionCloseHeaderIsPresent = true; 121 } 122 } else if ("content-length".equals(nextHeader)) { 123 try { 124 myContentLength = Integer.parseInt(nextValue); 125 ourLog.trace("Found content length: {}", myContentLength); 126 } catch (NumberFormatException e) { 127 addConformanceProblem("Could not parse Content-Length header value: " + nextHeader); 128 } 129 } else if ("content-type".equals(nextHeader)) { 130 int colonIndex = nextValue.indexOf(';'); 131 if (colonIndex == -1) { 132 myContentType = nextValue.trim(); 133 } else { 134 myContentType = nextValue.substring(0, colonIndex).trim(); 135 String charsetDef = nextValue.substring(colonIndex + 1).trim(); 136 if (charsetDef.startsWith("charset=")) { 137 String charsetName = charsetDef.substring(8); 138 Charset charset; 139 try { 140 charset = Charset.forName(charsetName); 141 } catch (UnsupportedCharsetException e) { 142 addConformanceProblem("Unsupported or invalid charset: " + charsetName); 143 continue; 144 } 145 setCharset(charset); 146 } 147 } 148 149 myEncodingStyle = EncodingStyle.getEncodingStyleForContentType(myContentType); 150 ourLog.trace("Found content type {} with resolves to encoding style {}", myContentType, myEncodingStyle); 151 152 } else if ("authorization".equals(nextHeader)) { 153 int spaceIndex = nextValue.indexOf(' '); 154 if (spaceIndex == -1) { 155 throw new DecodeException("Invalid authorization header. No authorization style detected"); 156 } 157 String type = nextValue.substring(0, spaceIndex); 158 if ("basic".equalsIgnoreCase(type)) { 159 String encodedCredentials = nextValue.substring(spaceIndex + 1); 160 byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials); 161 String credentialsString = new String(decodedCredentials, getDefaultCharset()); 162 int colonIndex = credentialsString.indexOf(':'); 163 if (colonIndex == -1) { 164 setUsername(credentialsString); 165 } else { 166 setUsername(credentialsString.substring(0, colonIndex)); 167 setPassword(credentialsString.substring(colonIndex + 1)); 168 } 169 170 ourLog.trace("Found authorization header with username: {}", getUsername()); 171 172 } else { 173 addConformanceProblem("Invalid authorization type. Only basic authorization is supported."); 174 } 175 176 } else if ("content-encoding".equals(nextHeader)) { 177 if (StringUtils.isNotBlank(nextValue)) { 178 if ("gzip".equals(nextValue)) { 179 myGzipCoding = true; 180 } else { 181 throw new DecodeException("Unknown Content-Encoding: " + nextValue); 182 } 183 } 184 ourLog.trace("Found content coding: {}", nextValue); 185 } else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) { 186 ourLog.trace("Found signature: {}", nextValue); 187 mySignature = nextValue; 188 } else { 189 ourLog.trace("Ignoring header {}={}", nextHeader, nextValue); 190 } 191 192 } 193 194 ourLog.trace("Done processing headers"); 195 196 } 197 198 /** 199 * Protected because this doesn't make sense for a sender 200 */ 201 protected boolean isConnectionCloseHeaderPresent() { 202 return myConnectionCloseHeaderIsPresent; 203 } 204 205 /** 206 * Returns the {@link EncodingStyle} associated with the incoming message, 207 * or <code>null</code>. This will be set automatically based on the value 208 * of the <code>Content-Type</code> header, and will be set to 209 * <code>null</code> if the content type is not provided, or if the content 210 * type does not correspond to an HL7 type. 211 * 212 * @see {@link EncodingStyle} for a list of appropriate content types 213 */ 214 public EncodingStyle getEncodingStyle() { 215 return myEncodingStyle; 216 } 217 218 private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, AuthorizationFailureException, IOException, SignatureVerificationException { 219 decodeHeaders(); 220 authorize(); 221 if (myTransferEncoding == TransferEncoding.CHUNKED) { 222 myBytes = readBytesChunked(theInputStream); 223 } else { 224 myBytes = readBytesNonChunked(theInputStream); 225 } 226 227 decodeBody(); 228 229 if (getContentType() == null) { 230 throw new DecodeException("Content-Type not specified"); 231 } 232 if (getEncodingStyle() == null) { 233 throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage()); 234 } 235 236 verifySignature(); 237 } 238 239 private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException { 240 ourLog.debug("Decoding message bytes using CHUNKED encoding style"); 241 byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE]; 242 ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE); 243 244 while (true) { 245 String nextSize; 246 try { 247 nextSize = readLine(theInputStream); 248 } catch (IOException e) { 249 throw new DecodeException("Failed to decode CHUNKED encoding", e); 250 } 251 252 ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize); 253 254 if (nextSize.length() == 0) { 255 break; 256 } 257 258 int nextSizeInt; 259 try { 260 nextSizeInt = Integer.parseInt(nextSize, 16); 261 } catch (NumberFormatException e) { 262 throw new DecodeException("Failed to decode CHUNKED encoding", e); 263 } 264 265 ourLog.debug("Next CHUNKED size: {}", nextSizeInt); 266 267 if (nextSizeInt < 0) { 268 throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize); 269 } 270 271 boolean trailing = false; 272 if (nextSizeInt > 0) { 273 int totalRead = 0; 274 myLastStartedReading = System.currentTimeMillis(); 275 do { 276 int nextRead = Math.min(nextSizeInt, byteBuffer.length); 277 int bytesRead = theInputStream.read(byteBuffer, 0, nextRead); 278 if (bytesRead == -1) { 279 ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size()); 280 throw new DecodeException("Reached EOF while reading in message chunk"); 281 } 282 if (bytesRead == 0 && totalRead < nextSizeInt) { 283 pauseDuringTimedOutRead(); 284 } 285 totalRead += bytesRead; 286 287 if (ourLog.isTraceEnabled()) { 288 ourLog.trace("Read {} byte chunk: {}", bytesRead, new String(byteBuffer, 0, bytesRead)); 289 }else { 290 ourLog.debug("Read {} byte chunk", bytesRead); 291 } 292 293 bos.write(byteBuffer, 0, bytesRead); 294 295 } while (totalRead < nextSizeInt); 296 } else { 297 trailing = true; 298 } 299 300 // Try to read a trailing CRLF 301 int nextChar; 302 boolean had13 = false; 303 boolean had10 = false; 304 while (true) { 305 try { 306 nextChar = theInputStream.read(); 307 if (ourLog.isTraceEnabled()) { 308 ourLog.trace("Read byte: " + (char)nextChar + " (" + nextChar + ")"); 309 } 310 } catch (SocketTimeoutException e) { 311 break; 312 } 313 314 if (nextChar == -1) { 315 break; 316 } else if (nextChar == 13) { 317 if (had13) { 318 /* 319 * This is an attempt to be tolerant of people using the wrong 320 * end of line sequence (it should be CRLF), as is the 321 * had10 below 322 */ 323 trailing = true; 324 } 325 had13 = true; 326 continue; 327 } else if (nextChar == 10) { 328 if (had10) { 329 trailing = true; 330 } 331 break; 332 } else { 333 break; 334 } 335 } 336 337 if (trailing) { 338 break; 339 } 340 341 } // while 342 343 return bos.toByteArray(); 344 } 345 346 private void verifySignature() throws SignatureVerificationException, DecodeException { 347 if (getSigner() != null && StringUtils.isBlank(mySignature)) { 348 String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response"; 349 throw new SignatureVerificationException("No HL7 Signature found in " + mode); 350 } 351 if (getSigner() != null) { 352 try { 353 getSigner().verify(myBytes, mySignature); 354 } catch (SignatureFailureException e) { 355 throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e); 356 } 357 } 358 } 359 360 public List<String> getConformanceProblems() { 361 if (myConformanceProblems == null) { 362 myConformanceProblems = new ArrayList<String>(); 363 } 364 return myConformanceProblems; 365 } 366 367 /** 368 * @return Returns the content type associated with the message (e.g. application/hl7-v2) 369 */ 370 public String getContentType() { 371 return myContentType; 372 } 373 374 /** 375 * @return the responseName 376 */ 377 public String getResponseName() { 378 return myResponseName; 379 } 380 381 /** 382 * @return the responseStatus 383 */ 384 public Integer getResponseStatus() { 385 return myResponseStatus; 386 } 387 388 protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException; 389 390 private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException { 391 ourLog.debug("Decoding message bytes using non-chunked encoding style"); 392 393 int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE; 394 ByteArrayOutputStream bos = new ByteArrayOutputStream(length); 395 396 byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE]; 397 myLastStartedReading = System.currentTimeMillis(); 398 while ((myContentLength < 0 || bos.size() < myContentLength)) { 399 if (myContentLength < 0) { 400 try { 401 if (theInputStream.available() <= 0) { 402 ourLog.trace("No more bytes available"); 403 break; 404 } 405 } catch (IOException e) { 406 ourLog.debug("Received IOException while calling inputStream#available()", e); 407 throw e; 408 } 409 } 410 411 int max; 412 if (myContentLength > 0) { 413 max = myContentLength - bos.size(); 414 max = Math.min(max, buffer.length); 415 } else { 416 max = buffer.length; 417 } 418 419 try { 420 int bytesRead = theInputStream.read(buffer, 0, max); 421 myLastStartedReading = System.currentTimeMillis(); 422 if (bytesRead == -1) { 423 ourLog.trace("Read end of stream"); 424 break; 425 } else { 426 if (ourLog.isTraceEnabled()) { 427 ourLog.trace("Read {} bytes from stream:\n{}", bytesRead, ByteUtils.formatBytesForLogging(bytesRead, 0, buffer)); 428 } 429 } 430 bos.write(buffer, 0, bytesRead); 431 } catch (SocketTimeoutException e) { 432 long elapsed = System.currentTimeMillis() - myLastStartedReading; 433 if (elapsed > myReadTimeout) { 434 throw e; 435 } else { 436 ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout); 437 try { 438 Thread.sleep(100); 439 } catch (InterruptedException e1) { 440 // ignore 441 } 442 } 443 } catch (IOException e) { 444 ourLog.debug("Received IOException while calling inputStream#available()", e); 445 throw e; 446 } 447 } 448 449 return bos.toByteArray(); 450 } 451 452 /** 453 * Read in the contents of the raw message from the input stream and decode 454 * entire the message. This method assumes that the headers have been 455 * provided using {@link #setHeaders(LinkedHashMap)} 456 * 457 * @param theInputStream 458 * The inputstream to read the raw message from 459 * @throws AuthorizationFailureException 460 * If the authorization check fails. This will only be thrown if 461 * this decoder is decoding a request message, and an 462 * authorization callback has been provided, and the 463 * authorization fails. 464 * @throws DecodeException 465 * If the message can not be decoded for any reason 466 * @throws IOException 467 * If there is a failure while reading from the inputstream 468 * @throws SignatureVerificationException 469 * If the signature verification fails. This will only occur if 470 * {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer} 471 * has been provided. 472 */ 473 public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException { 474 verifyNotUsed(); 475 476 doReadContentsFromInputStreamAndDecode(theInputStream); 477 } 478 479 protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException { 480 ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream); 481 String retVal = readLine(theInputStream, true); 482 ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal); 483 return retVal; 484 } 485 486 /** 487 * Note that if {@link #setPath(String)} is called, this method will assume 488 * that the first line of the HTTP request has already been read from the 489 * input stream. If {@link #setHeaders(java.util.LinkedHashMap)} has been 490 * called, this method will assume that the HTTP headers have already been 491 * read from the input stream as well as the double-LF (ASCII-10) that 492 * proceeds the headers. 493 * 494 * 495 * @param theInputStream 496 * The inputstream to read the raw message from 497 * @throws AuthorizationFailureException 498 * If the authorization check fails. This will only be thrown if 499 * this decoder is decoding a request message, and an 500 * authorization callback has been provided, and the 501 * authorization fails. 502 * @throws DecodeException 503 * If the message can not be decoded for any reason 504 * @throws IOException 505 * If there is a failure while reading from the inputstream 506 * @throws SignatureVerificationException 507 * If the signature verification fails. This will only occur if 508 * {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer} 509 * has been provided. 510 */ 511 public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException { 512 verifyNotUsed(); 513 514 String actionLine = readActionLineAndDecode(theInputStream); 515 516 ourLog.debug("Read action line: {}", actionLine); 517 518 if (getHeaders() == null) { 519 setHeaders(new LinkedHashMap<String, String>()); 520 521 while (true) { 522 String nextLine = readLine(theInputStream); 523 if (nextLine.length() == 0) { 524 break; 525 } 526 527 int colonIndex = nextLine.indexOf(':'); 528 if (colonIndex == -1) { 529 throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine); 530 } 531 532 String key = nextLine.substring(0, colonIndex); 533 String value = nextLine.substring(colonIndex + 1).trim(); 534 535 ourLog.debug("Read header {}={}", key,value); 536 537 getHeaders().put(key, value); 538 } 539 } 540 541 doReadContentsFromInputStreamAndDecode(theInputStream); 542 543 } 544 545 private String readLine(InputStream theInputStream) throws IOException { 546 try { 547 return readLine(theInputStream, false); 548 } catch (NoMessageReceivedException e) { 549 throw new Error("Threw a NoMessageReceivedException. This should not happen.", e); 550 } 551 } 552 553 private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException { 554 555 myLastStartedReading = System.currentTimeMillis(); 556 557 StringBuilder retVal = new StringBuilder(); 558 while (true) { 559 560 int b; 561 try { 562 b = theInputStream.read(); 563 if (ourLog.isTraceEnabled()) { 564 ourLog.trace("Read byte: " + (char)b + " (" + b + ")"); 565 } 566 } catch (SocketTimeoutException e) { 567 if (retVal.length() == 0 && theFirstLine) { 568 ourLog.trace("No message received, aborting readLine(InputStream, boolean)"); 569 throw new NoMessageReceivedException(); 570 } 571 ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue"); 572 pauseDuringTimedOutRead(); 573 continue; 574 } 575 576 if (b == 13) { 577 continue; 578 } else if (b == 10) { 579 break; 580 } else if (b == -1) { 581 ourLog.debug("Current read line is: {}", retVal); 582 ourLog.info("Read -1 from input stream, closing it"); 583 theInputStream.close(); 584 if (retVal.length() == 0) { 585 throw new SocketException("Received EOF from input stream"); 586 } 587 break; 588 } else if (b < ' ') { 589 continue; 590 } else { 591 retVal.append((char) b); 592 } 593 } 594 595 ourLog.debug("Current read line is: {}", retVal); 596 597 return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim(); 598 } 599 600 private void pauseDuringTimedOutRead() throws SocketTimeoutException { 601 long elapsed = System.currentTimeMillis() - myLastStartedReading; 602 if (elapsed > myReadTimeout) { 603 ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout); 604 throw new SocketTimeoutException(); 605 } 606 try { 607 Thread.sleep(100); 608 } catch (InterruptedException e1) { 609 // ignore 610 } 611 } 612 613 /** 614 * Sets the number of milliseconds that the decoder will attempt to read 615 * from an InputStream before timing out and throwing an exception 616 */ 617 public void setReadTimeout(long theReadTimeout) { 618 myReadTimeout = theReadTimeout; 619 } 620 621 /** 622 * @param theResponseName 623 * the responseName to set 624 */ 625 public void setResponseName(String theResponseName) { 626 myResponseName = theResponseName; 627 } 628 629 /** 630 * @param theResponseStatus 631 * the responseStatus to set 632 */ 633 public void setResponseStatus(Integer theResponseStatus) { 634 myResponseStatus = theResponseStatus; 635 } 636 637}