1 package ca.uhn.hl7v2.hoh.encoder;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.net.SocketException;
7 import java.net.SocketTimeoutException;
8 import java.nio.charset.Charset;
9 import java.nio.charset.UnsupportedCharsetException;
10 import java.util.ArrayList;
11 import java.util.LinkedHashMap;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.regex.Pattern;
15
16 import ca.uhn.hl7v2.hoh.api.DecodeException;
17 import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException;
18 import ca.uhn.hl7v2.hoh.sign.SignatureFailureException;
19 import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException;
20 import ca.uhn.hl7v2.hoh.util.ByteUtils;
21 import ca.uhn.hl7v2.hoh.util.GZipUtils;
22 import ca.uhn.hl7v2.hoh.util.IOUtils;
23 import ca.uhn.hl7v2.hoh.util.StringUtils;
24 import ca.uhn.hl7v2.hoh.util.repackage.Base64;
25
26 public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp {
27
28 private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+");
29
30
31
32
33
34
35
36 public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
37
38 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class);
39
40 private byte[] myBytes;
41 private List<String> myConformanceProblems;
42 private int myContentLength = -1;
43 private String myContentType;
44 private boolean myGzipCoding;
45 private long myLastStartedReading;
46 private long myReadTimeout = DEFAULT_READ_TIMEOUT;
47 private String myResponseName;
48 private Integer myResponseStatus;
49 private TransferEncoding myTransferEncoding;
50 private String mySignature;
51 private EncodingStyle myEncodingStyle;
52
53 private boolean myConnectionCloseHeaderIsPresent;
54
55 private void addConformanceProblem(String theString) {
56 ourLog.debug("Conformance problem detected: {}", theString);
57 if (myConformanceProblems == null) {
58 myConformanceProblems = new ArrayList<String>();
59 }
60 myConformanceProblems.add(theString);
61 }
62
63 protected abstract void authorize() throws AuthorizationFailureException;
64
65 public void decode() throws DecodeException, SignatureVerificationException {
66 ourLog.trace("Entering decode()");
67
68 verifyNotUsed();
69
70 decodeHeaders();
71 authorize();
72 decodeBody();
73 verifySignature();
74
75 ourLog.trace("Exiting decode()");
76 }
77
78 private void decodeBody() throws DecodeException {
79 byte[] bytes = myBytes;
80
81 if (myGzipCoding) {
82 ourLog.debug("Decoding message contents using GZIP encoding style");
83 try {
84 bytes = GZipUtils.uncompress(bytes);
85 } catch (IOException e) {
86 throw new DecodeException("Failed to uncompress GZip content", e);
87 }
88 }
89
90 Charset charset = getCharset();
91
92 ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name());
93 if (ourLog.isTraceEnabled()) {
94 ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset));
95 }
96
97 String messageString = new String(bytes, charset);
98 setMessage(messageString);
99 }
100
101 private void decodeHeaders() throws DecodeException {
102
103 ourLog.trace("Header map contains: {}", getHeaders());
104
105 for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) {
106 String nextHeader = nextEntry.getKey().toLowerCase();
107 String nextValue = nextEntry.getValue();
108
109 ourLog.trace("Next header: {}={}", nextHeader, nextValue);
110
111 if ("transfer-encoding".equals(nextHeader)) {
112 if ("chunked".equalsIgnoreCase(nextValue)) {
113 myTransferEncoding = TransferEncoding.CHUNKED;
114 ourLog.trace("Found chunked transfer encoding");
115 } else {
116 throw new DecodeException("Unknown transfer encoding: " + nextValue);
117 }
118 } else if ("connection".equals(nextHeader)) {
119 if ("close".equals(nextValue)) {
120 myConnectionCloseHeaderIsPresent = true;
121 }
122 } else if ("content-length".equals(nextHeader)) {
123 try {
124 myContentLength = Integer.parseInt(nextValue);
125 ourLog.trace("Found content length: {}", myContentLength);
126 } catch (NumberFormatException e) {
127 addConformanceProblem("Could not parse Content-Length header value: " + nextHeader);
128 }
129 } else if ("content-type".equals(nextHeader)) {
130 int colonIndex = nextValue.indexOf(';');
131 if (colonIndex == -1) {
132 myContentType = nextValue.trim();
133 } else {
134 myContentType = nextValue.substring(0, colonIndex).trim();
135 String charsetDef = nextValue.substring(colonIndex + 1).trim();
136 if (charsetDef.startsWith("charset=")) {
137 String charsetName = charsetDef.substring(8);
138 Charset charset;
139 try {
140 charset = Charset.forName(charsetName);
141 } catch (UnsupportedCharsetException e) {
142 addConformanceProblem("Unsupported or invalid charset: " + charsetName);
143 continue;
144 }
145 setCharset(charset);
146 }
147 }
148
149 myEncodingStyle = EncodingStyle.getEncodingStyleForContentType(myContentType);
150 ourLog.trace("Found content type {} with resolves to encoding style {}", myContentType, myEncodingStyle);
151
152 } else if ("authorization".equals(nextHeader)) {
153 int spaceIndex = nextValue.indexOf(' ');
154 if (spaceIndex == -1) {
155 throw new DecodeException("Invalid authorization header. No authorization style detected");
156 }
157 String type = nextValue.substring(0, spaceIndex);
158 if ("basic".equalsIgnoreCase(type)) {
159 String encodedCredentials = nextValue.substring(spaceIndex + 1);
160 byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials);
161 String credentialsString = new String(decodedCredentials, getDefaultCharset());
162 int colonIndex = credentialsString.indexOf(':');
163 if (colonIndex == -1) {
164 setUsername(credentialsString);
165 } else {
166 setUsername(credentialsString.substring(0, colonIndex));
167 setPassword(credentialsString.substring(colonIndex + 1));
168 }
169
170 ourLog.trace("Found authorization header with username: {}", getUsername());
171
172 } else {
173 addConformanceProblem("Invalid authorization type. Only basic authorization is supported.");
174 }
175
176 } else if ("content-encoding".equals(nextHeader)) {
177 if (StringUtils.isNotBlank(nextValue)) {
178 if ("gzip".equals(nextValue)) {
179 myGzipCoding = true;
180 } else {
181 throw new DecodeException("Unknown Content-Encoding: " + nextValue);
182 }
183 }
184 ourLog.trace("Found content coding: {}", nextValue);
185 } else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) {
186 ourLog.trace("Found signature: {}", nextValue);
187 mySignature = nextValue;
188 } else {
189 ourLog.trace("Ignoring header {}={}", nextHeader, nextValue);
190 }
191
192 }
193
194 ourLog.trace("Done processing headers");
195
196 }
197
198
199
200
201 protected boolean isConnectionCloseHeaderPresent() {
202 return myConnectionCloseHeaderIsPresent;
203 }
204
205
206
207
208
209
210
211
212
213
214 public EncodingStyle getEncodingStyle() {
215 return myEncodingStyle;
216 }
217
218 private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, AuthorizationFailureException, IOException, SignatureVerificationException {
219 decodeHeaders();
220 authorize();
221 if (myTransferEncoding == TransferEncoding.CHUNKED) {
222 myBytes = readBytesChunked(theInputStream);
223 } else {
224 myBytes = readBytesNonChunked(theInputStream);
225 }
226
227 decodeBody();
228
229 if (getContentType() == null) {
230 throw new DecodeException("Content-Type not specified");
231 }
232 if (getEncodingStyle() == null) {
233 throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage());
234 }
235
236 verifySignature();
237 }
238
239 private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException {
240 ourLog.debug("Decoding message bytes using CHUNKED encoding style");
241 byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
242 ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE);
243
244 while (true) {
245 String nextSize;
246 try {
247 nextSize = readLine(theInputStream);
248 } catch (IOException e) {
249 throw new DecodeException("Failed to decode CHUNKED encoding", e);
250 }
251
252 ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize);
253
254 if (nextSize.length() == 0) {
255 break;
256 }
257
258 int nextSizeInt;
259 try {
260 nextSizeInt = Integer.parseInt(nextSize, 16);
261 } catch (NumberFormatException e) {
262 throw new DecodeException("Failed to decode CHUNKED encoding", e);
263 }
264
265 ourLog.debug("Next CHUNKED size: {}", nextSizeInt);
266
267 if (nextSizeInt < 0) {
268 throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize);
269 }
270
271 boolean trailing = false;
272 if (nextSizeInt > 0) {
273 int totalRead = 0;
274 myLastStartedReading = System.currentTimeMillis();
275 do {
276 int nextRead = Math.min(nextSizeInt, byteBuffer.length);
277 int bytesRead = theInputStream.read(byteBuffer, 0, nextRead);
278 if (bytesRead == -1) {
279 ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size());
280 throw new DecodeException("Reached EOF while reading in message chunk");
281 }
282 if (bytesRead == 0 && totalRead < nextSizeInt) {
283 pauseDuringTimedOutRead();
284 }
285 totalRead += bytesRead;
286
287 if (ourLog.isTraceEnabled()) {
288 ourLog.trace("Read {} byte chunk: {}", bytesRead, new String(byteBuffer, 0, bytesRead));
289 }else {
290 ourLog.debug("Read {} byte chunk", bytesRead);
291 }
292
293 bos.write(byteBuffer, 0, bytesRead);
294
295 } while (totalRead < nextSizeInt);
296 } else {
297 trailing = true;
298 }
299
300
301 int nextChar;
302 boolean had13 = false;
303 boolean had10 = false;
304 while (true) {
305 try {
306 nextChar = theInputStream.read();
307 if (ourLog.isTraceEnabled()) {
308 ourLog.trace("Read byte: " + (char)nextChar + " (" + nextChar + ")");
309 }
310 } catch (SocketTimeoutException e) {
311 break;
312 }
313
314 if (nextChar == -1) {
315 break;
316 } else if (nextChar == 13) {
317 if (had13) {
318
319
320
321
322
323 trailing = true;
324 }
325 had13 = true;
326 continue;
327 } else if (nextChar == 10) {
328 if (had10) {
329 trailing = true;
330 }
331 break;
332 } else {
333 break;
334 }
335 }
336
337 if (trailing) {
338 break;
339 }
340
341 }
342
343 return bos.toByteArray();
344 }
345
346 private void verifySignature() throws SignatureVerificationException, DecodeException {
347 if (getSigner() != null && StringUtils.isBlank(mySignature)) {
348 String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response";
349 throw new SignatureVerificationException("No HL7 Signature found in " + mode);
350 }
351 if (getSigner() != null) {
352 try {
353 getSigner().verify(myBytes, mySignature);
354 } catch (SignatureFailureException e) {
355 throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e);
356 }
357 }
358 }
359
360 public List<String> getConformanceProblems() {
361 if (myConformanceProblems == null) {
362 myConformanceProblems = new ArrayList<String>();
363 }
364 return myConformanceProblems;
365 }
366
367
368
369
370 public String getContentType() {
371 return myContentType;
372 }
373
374
375
376
377 public String getResponseName() {
378 return myResponseName;
379 }
380
381
382
383
384 public Integer getResponseStatus() {
385 return myResponseStatus;
386 }
387
388 protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException;
389
390 private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException {
391 ourLog.debug("Decoding message bytes using non-chunked encoding style");
392
393 int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE;
394 ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
395
396 byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
397 myLastStartedReading = System.currentTimeMillis();
398 while ((myContentLength < 0 || bos.size() < myContentLength)) {
399 if (myContentLength < 0) {
400 try {
401 if (theInputStream.available() <= 0) {
402 ourLog.trace("No more bytes available");
403 break;
404 }
405 } catch (IOException e) {
406 ourLog.debug("Received IOException while calling inputStream#available()", e);
407 throw e;
408 }
409 }
410
411 int max;
412 if (myContentLength > 0) {
413 max = myContentLength - bos.size();
414 max = Math.min(max, buffer.length);
415 } else {
416 max = buffer.length;
417 }
418
419 try {
420 int bytesRead = theInputStream.read(buffer, 0, max);
421 myLastStartedReading = System.currentTimeMillis();
422 if (bytesRead == -1) {
423 ourLog.trace("Read end of stream");
424 break;
425 } else {
426 if (ourLog.isTraceEnabled()) {
427 ourLog.trace("Read {} bytes from stream:\n{}", bytesRead, ByteUtils.formatBytesForLogging(bytesRead, 0, buffer));
428 }
429 }
430 bos.write(buffer, 0, bytesRead);
431 } catch (SocketTimeoutException e) {
432 long elapsed = System.currentTimeMillis() - myLastStartedReading;
433 if (elapsed > myReadTimeout) {
434 throw e;
435 } else {
436 ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout);
437 try {
438 Thread.sleep(100);
439 } catch (InterruptedException e1) {
440
441 }
442 }
443 } catch (IOException e) {
444 ourLog.debug("Received IOException while calling inputStream#available()", e);
445 throw e;
446 }
447 }
448
449 return bos.toByteArray();
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473 public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException {
474 verifyNotUsed();
475
476 doReadContentsFromInputStreamAndDecode(theInputStream);
477 }
478
479 protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException {
480 ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream);
481 String retVal = readLine(theInputStream, true);
482 ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal);
483 return retVal;
484 }
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511 public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException {
512 verifyNotUsed();
513
514 String actionLine = readActionLineAndDecode(theInputStream);
515
516 ourLog.debug("Read action line: {}", actionLine);
517
518 if (getHeaders() == null) {
519 setHeaders(new LinkedHashMap<String, String>());
520
521 while (true) {
522 String nextLine = readLine(theInputStream);
523 if (nextLine.length() == 0) {
524 break;
525 }
526
527 int colonIndex = nextLine.indexOf(':');
528 if (colonIndex == -1) {
529 throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine);
530 }
531
532 String key = nextLine.substring(0, colonIndex);
533 String value = nextLine.substring(colonIndex + 1).trim();
534
535 ourLog.debug("Read header {}={}", key,value);
536
537 getHeaders().put(key, value);
538 }
539 }
540
541 doReadContentsFromInputStreamAndDecode(theInputStream);
542
543 }
544
545 private String readLine(InputStream theInputStream) throws IOException {
546 try {
547 return readLine(theInputStream, false);
548 } catch (NoMessageReceivedException e) {
549 throw new Error("Threw a NoMessageReceivedException. This should not happen.", e);
550 }
551 }
552
553 private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException {
554
555 myLastStartedReading = System.currentTimeMillis();
556
557 StringBuilder retVal = new StringBuilder();
558 while (true) {
559
560 int b;
561 try {
562 b = theInputStream.read();
563 if (ourLog.isTraceEnabled()) {
564 ourLog.trace("Read byte: " + (char)b + " (" + b + ")");
565 }
566 } catch (SocketTimeoutException e) {
567 if (retVal.length() == 0 && theFirstLine) {
568 ourLog.trace("No message received, aborting readLine(InputStream, boolean)");
569 throw new NoMessageReceivedException();
570 }
571 ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue");
572 pauseDuringTimedOutRead();
573 continue;
574 }
575
576 if (b == 13) {
577 continue;
578 } else if (b == 10) {
579 break;
580 } else if (b == -1) {
581 ourLog.debug("Current read line is: {}", retVal);
582 ourLog.info("Read -1 from input stream, closing it");
583 theInputStream.close();
584 if (retVal.length() == 0) {
585 throw new SocketException("Received EOF from input stream");
586 }
587 break;
588 } else if (b < ' ') {
589 continue;
590 } else {
591 retVal.append((char) b);
592 }
593 }
594
595 ourLog.debug("Current read line is: {}", retVal);
596
597 return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim();
598 }
599
600 private void pauseDuringTimedOutRead() throws SocketTimeoutException {
601 long elapsed = System.currentTimeMillis() - myLastStartedReading;
602 if (elapsed > myReadTimeout) {
603 ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout);
604 throw new SocketTimeoutException();
605 }
606 try {
607 Thread.sleep(100);
608 } catch (InterruptedException e1) {
609
610 }
611 }
612
613
614
615
616
617 public void setReadTimeout(long theReadTimeout) {
618 myReadTimeout = theReadTimeout;
619 }
620
621
622
623
624
625 public void setResponseName(String theResponseName) {
626 myResponseName = theResponseName;
627 }
628
629
630
631
632
633 public void setResponseStatus(Integer theResponseStatus) {
634 myResponseStatus = theResponseStatus;
635 }
636
637 }