1 package ca.uhn.hl7v2.hoh.encoder;
2
3 import java.io.ByteArrayOutputStream;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.net.SocketException;
7 import java.net.SocketTimeoutException;
8 import java.nio.charset.Charset;
9 import java.nio.charset.UnsupportedCharsetException;
10 import java.util.ArrayList;
11 import java.util.LinkedHashMap;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.regex.Pattern;
15
16 import ca.uhn.hl7v2.hoh.api.DecodeException;
17 import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException;
18 import ca.uhn.hl7v2.hoh.sign.SignatureFailureException;
19 import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException;
20 import ca.uhn.hl7v2.hoh.util.ByteUtils;
21 import ca.uhn.hl7v2.hoh.util.GZipUtils;
22 import ca.uhn.hl7v2.hoh.util.IOUtils;
23 import ca.uhn.hl7v2.hoh.util.StringUtils;
24 import ca.uhn.hl7v2.hoh.util.repackage.Base64;
25
26 public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp {
27
28 private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+");
29
30
31
32
33
34
35
36 public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
37
38 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class);
39
40 private byte[] myBytes;
41 private List<String> myConformanceProblems;
42 private int myContentLength = -1;
43 private String myContentType;
44 private boolean myGzipCoding;
45 private long myLastStartedReading;
46 private long myReadTimeout = DEFAULT_READ_TIMEOUT;
47 private String myResponseName;
48 private Integer myResponseStatus;
49 private TransferEncoding myTransferEncoding;
50 private String mySignature;
51 private EncodingStyle myEncodingStyle;
52
53 private boolean myConnectionCloseHeaderIsPresent;
54
55 private void addConformanceProblem(String theString) {
56 ourLog.debug("Conformance problem detected: {}", theString);
57 if (myConformanceProblems == null) {
58 myConformanceProblems = new ArrayList<>();
59 }
60 myConformanceProblems.add(theString);
61 }
62
63 protected abstract void authorize() throws AuthorizationFailureException;
64
65 public void decode() throws DecodeException, SignatureVerificationException {
66 ourLog.trace("Entering decode()");
67
68 verifyNotUsed();
69
70 decodeHeaders();
71 authorize();
72 decodeBody();
73 verifySignature();
74
75 ourLog.trace("Exiting decode()");
76 }
77
78 private void decodeBody() throws DecodeException {
79 byte[] bytes = myBytes;
80
81 if (myGzipCoding) {
82 ourLog.debug("Decoding message contents using GZIP encoding style");
83 try {
84 bytes = GZipUtils.uncompress(bytes);
85 } catch (IOException e) {
86 throw new DecodeException("Failed to uncompress GZip content", e);
87 }
88 }
89
90 Charset charset = getCharset();
91
92 ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name());
93 if (ourLog.isTraceEnabled()) {
94 ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset));
95 }
96
97 String messageString = new String(bytes, charset);
98 setMessage(messageString);
99 }
100
101 private void decodeHeaders() throws DecodeException {
102
103 ourLog.trace("Header map contains: {}", getHeaders());
104
105 for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) {
106 String nextHeader = nextEntry.getKey().toLowerCase();
107 String nextValue = nextEntry.getValue();
108
109 ourLog.trace("Next header: {}={}", nextHeader, nextValue);
110
111 if ("transfer-encoding".equals(nextHeader)) {
112 if ("chunked".equalsIgnoreCase(nextValue)) {
113 myTransferEncoding = TransferEncoding.CHUNKED;
114 ourLog.trace("Found chunked transfer encoding");
115 } else {
116 throw new DecodeException("Unknown transfer encoding: " + nextValue);
117 }
118 } else if ("connection".equals(nextHeader)) {
119 if ("close".equals(nextValue)) {
120 myConnectionCloseHeaderIsPresent = true;
121 }
122 } else if ("content-length".equals(nextHeader)) {
123 try {
124 myContentLength = Integer.parseInt(nextValue);
125 ourLog.trace("Found content length: {}", myContentLength);
126 } catch (NumberFormatException e) {
127 addConformanceProblem("Could not parse Content-Length header value: " + nextHeader);
128 }
129 } else if ("content-type".equals(nextHeader)) {
130 int colonIndex = nextValue.indexOf(';');
131 if (colonIndex == -1) {
132 myContentType = nextValue.trim();
133 } else {
134 myContentType = nextValue.substring(0, colonIndex).trim();
135 String charsetDef = nextValue.substring(colonIndex + 1).trim();
136 if (charsetDef.startsWith("charset=")) {
137 String charsetName = charsetDef.substring(8);
138 Charset charset;
139 try {
140 charset = Charset.forName(charsetName);
141 } catch (UnsupportedCharsetException e) {
142 addConformanceProblem("Unsupported or invalid charset: " + charsetName);
143 continue;
144 }
145 setCharset(charset);
146 }
147 }
148
149 myEncodingStyle = EncodingStyle.getEncodingStyleForContentType(myContentType);
150 ourLog.trace("Found content type {} with resolves to encoding style {}", myContentType, myEncodingStyle);
151
152 } else if ("authorization".equals(nextHeader)) {
153 int spaceIndex = nextValue.indexOf(' ');
154 if (spaceIndex == -1) {
155 throw new DecodeException("Invalid authorization header. No authorization style detected");
156 }
157 String type = nextValue.substring(0, spaceIndex);
158 if ("basic".equalsIgnoreCase(type)) {
159 String encodedCredentials = nextValue.substring(spaceIndex + 1);
160 byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials);
161 String credentialsString = new String(decodedCredentials, getDefaultCharset());
162 int colonIndex = credentialsString.indexOf(':');
163 if (colonIndex == -1) {
164 setUsername(credentialsString);
165 } else {
166 setUsername(credentialsString.substring(0, colonIndex));
167 setPassword(credentialsString.substring(colonIndex + 1));
168 }
169
170 ourLog.trace("Found authorization header with username: {}", getUsername());
171
172 } else {
173 addConformanceProblem("Invalid authorization type. Only basic authorization is supported.");
174 }
175
176 } else if ("content-encoding".equals(nextHeader)) {
177 if (StringUtils.isNotBlank(nextValue)) {
178 if ("gzip".equals(nextValue)) {
179 myGzipCoding = true;
180 } else {
181 throw new DecodeException("Unknown Content-Encoding: " + nextValue);
182 }
183 }
184 ourLog.trace("Found content coding: {}", nextValue);
185 } else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) {
186 ourLog.trace("Found signature: {}", nextValue);
187 mySignature = nextValue;
188 } else {
189 ourLog.trace("Ignoring header {}={}", nextHeader, nextValue);
190 }
191
192 }
193
194 ourLog.trace("Done processing headers");
195
196 }
197
198
199
200
201 protected boolean isConnectionCloseHeaderPresent() {
202 return myConnectionCloseHeaderIsPresent;
203 }
204
205
206
207
208
209
210
211
212
213
214 public EncodingStyle getEncodingStyle() {
215 return myEncodingStyle;
216 }
217
218 private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, IOException, SignatureVerificationException {
219 decodeHeaders();
220 authorize();
221 if (myTransferEncoding == TransferEncoding.CHUNKED) {
222 myBytes = readBytesChunked(theInputStream);
223 } else {
224 myBytes = readBytesNonChunked(theInputStream);
225 }
226
227 decodeBody();
228
229 if (getContentType() == null) {
230 throw new DecodeException("Content-Type not specified");
231 }
232 if (getEncodingStyle() == null) {
233 throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage());
234 }
235
236 verifySignature();
237 }
238
239 private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException {
240 ourLog.debug("Decoding message bytes using CHUNKED encoding style");
241 byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
242 ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE);
243
244 while (true) {
245 String nextSize;
246 try {
247 nextSize = readLine(theInputStream);
248 } catch (IOException e) {
249 throw new DecodeException("Failed to decode CHUNKED encoding", e);
250 }
251
252 ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize);
253
254 if (nextSize.length() == 0) {
255 break;
256 }
257
258 int nextSizeInt;
259 try {
260 nextSizeInt = Integer.parseInt(nextSize, 16);
261 } catch (NumberFormatException e) {
262 throw new DecodeException("Failed to decode CHUNKED encoding", e);
263 }
264
265 ourLog.debug("Next CHUNKED size: {}", nextSizeInt);
266
267 if (nextSizeInt < 0) {
268 throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize);
269 }
270
271 boolean trailing = false;
272 if (nextSizeInt > 0) {
273 int totalRead = 0;
274 myLastStartedReading = System.currentTimeMillis();
275 do {
276 int nextRead = Math.min(nextSizeInt, byteBuffer.length);
277 int bytesRead = theInputStream.read(byteBuffer, 0, nextRead);
278 if (bytesRead == -1) {
279 ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size());
280 throw new DecodeException("Reached EOF while reading in message chunk");
281 }
282 if (bytesRead == 0) {
283 pauseDuringTimedOutRead();
284 }
285 totalRead += bytesRead;
286
287 if (ourLog.isTraceEnabled()) {
288 ourLog.trace("Read {} byte chunk: {}", bytesRead, new String(byteBuffer, 0, bytesRead));
289 }else {
290 ourLog.debug("Read {} byte chunk", bytesRead);
291 }
292
293 bos.write(byteBuffer, 0, bytesRead);
294
295 } while (totalRead < nextSizeInt);
296 } else {
297 trailing = true;
298 }
299
300
301 int nextChar;
302 boolean had13 = false;
303 boolean had10 = false;
304 while (true) {
305 try {
306 nextChar = theInputStream.read();
307 if (ourLog.isTraceEnabled()) {
308 ourLog.trace("Read byte: " + (char)nextChar + " (" + nextChar + ")");
309 }
310 } catch (SocketTimeoutException e) {
311 break;
312 }
313
314 if (nextChar == -1) {
315 break;
316 } else if (nextChar == 13) {
317 if (had13) {
318
319
320
321
322
323 trailing = true;
324 }
325 had13 = true;
326 } else if (nextChar == 10) {
327 break;
328 } else {
329 break;
330 }
331 }
332
333 if (trailing) {
334 break;
335 }
336
337 }
338
339 return bos.toByteArray();
340 }
341
342 private void verifySignature() throws SignatureVerificationException, DecodeException {
343 if (getSigner() != null && StringUtils.isBlank(mySignature)) {
344 String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response";
345 throw new SignatureVerificationException("No HL7 Signature found in " + mode);
346 }
347 if (getSigner() != null) {
348 try {
349 getSigner().verify(myBytes, mySignature);
350 } catch (SignatureFailureException e) {
351 throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e);
352 }
353 }
354 }
355
356 public List<String> getConformanceProblems() {
357 if (myConformanceProblems == null) {
358 myConformanceProblems = new ArrayList<>();
359 }
360 return myConformanceProblems;
361 }
362
363
364
365
366 public String getContentType() {
367 return myContentType;
368 }
369
370
371
372
373 public String getResponseName() {
374 return myResponseName;
375 }
376
377
378
379
380 public Integer getResponseStatus() {
381 return myResponseStatus;
382 }
383
384 protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException;
385
386 private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException {
387 ourLog.debug("Decoding message bytes using non-chunked encoding style");
388
389 int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE;
390 ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
391
392 byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
393 myLastStartedReading = System.currentTimeMillis();
394 while ((myContentLength < 0 || bos.size() < myContentLength)) {
395 if (myContentLength < 0) {
396 try {
397 if (theInputStream.available() <= 0) {
398 ourLog.trace("No more bytes available");
399 break;
400 }
401 } catch (IOException e) {
402 ourLog.debug("Received IOException while calling inputStream#available()", e);
403 throw e;
404 }
405 }
406
407 int max;
408 if (myContentLength > 0) {
409 max = myContentLength - bos.size();
410 max = Math.min(max, buffer.length);
411 } else {
412 max = buffer.length;
413 }
414
415 try {
416 int bytesRead = theInputStream.read(buffer, 0, max);
417 myLastStartedReading = System.currentTimeMillis();
418 if (bytesRead == -1) {
419 ourLog.trace("Read end of stream");
420 break;
421 } else {
422 if (ourLog.isTraceEnabled()) {
423 ourLog.trace("Read {} bytes from stream:\n{}", bytesRead, ByteUtils.formatBytesForLogging(bytesRead, 0, buffer));
424 }
425 }
426 bos.write(buffer, 0, bytesRead);
427 } catch (SocketTimeoutException e) {
428 long elapsed = System.currentTimeMillis() - myLastStartedReading;
429 if (elapsed > myReadTimeout) {
430 throw e;
431 } else {
432 ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout);
433 try {
434 Thread.sleep(100);
435 } catch (InterruptedException e1) {
436
437 }
438 }
439 } catch (IOException e) {
440 ourLog.debug("Received IOException while calling inputStream#available()", e);
441 throw e;
442 }
443 }
444
445 return bos.toByteArray();
446 }
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469 public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException {
470 verifyNotUsed();
471
472 doReadContentsFromInputStreamAndDecode(theInputStream);
473 }
474
475 protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException {
476 ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream);
477 String retVal = readLine(theInputStream, true);
478 ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal);
479 return retVal;
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException {
508 verifyNotUsed();
509
510 String actionLine = readActionLineAndDecode(theInputStream);
511
512 ourLog.debug("Read action line: {}", actionLine);
513
514 if (getHeaders() == null) {
515 setHeaders(new LinkedHashMap<>());
516
517 while (true) {
518 String nextLine = readLine(theInputStream);
519 if (nextLine.length() == 0) {
520 break;
521 }
522
523 int colonIndex = nextLine.indexOf(':');
524 if (colonIndex == -1) {
525 throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine);
526 }
527
528 String key = nextLine.substring(0, colonIndex);
529 String value = nextLine.substring(colonIndex + 1).trim();
530
531 ourLog.debug("Read header {}={}", key,value);
532
533 getHeaders().put(key, value);
534 }
535 }
536
537 doReadContentsFromInputStreamAndDecode(theInputStream);
538
539 }
540
541 private String readLine(InputStream theInputStream) throws IOException {
542 try {
543 return readLine(theInputStream, false);
544 } catch (NoMessageReceivedException e) {
545 throw new Error("Threw a NoMessageReceivedException. This should not happen.", e);
546 }
547 }
548
549 private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException {
550
551 myLastStartedReading = System.currentTimeMillis();
552
553 StringBuilder retVal = new StringBuilder();
554 while (true) {
555
556 int b;
557 try {
558 b = theInputStream.read();
559 if (ourLog.isTraceEnabled()) {
560 ourLog.trace("Read byte: " + (char)b + " (" + b + ")");
561 }
562 } catch (SocketTimeoutException e) {
563 if (retVal.length() == 0 && theFirstLine) {
564 ourLog.trace("No message received, aborting readLine(InputStream, boolean)");
565 throw new NoMessageReceivedException();
566 }
567 ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue");
568 pauseDuringTimedOutRead();
569 continue;
570 }
571
572 if (b == 13) {
573 } else if (b == 10) {
574 break;
575 } else if (b == -1) {
576 ourLog.debug("Current read line is: {}", retVal);
577 ourLog.info("Read -1 from input stream, closing it");
578 theInputStream.close();
579 if (retVal.length() == 0) {
580 throw new SocketException("Received EOF from input stream");
581 }
582 break;
583 } else if (b < ' ') {
584 } else {
585 retVal.append((char) b);
586 }
587 }
588
589 ourLog.debug("Current read line is: {}", retVal);
590
591 return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim();
592 }
593
594 private void pauseDuringTimedOutRead() throws SocketTimeoutException {
595 long elapsed = System.currentTimeMillis() - myLastStartedReading;
596 if (elapsed > myReadTimeout) {
597 ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout);
598 throw new SocketTimeoutException();
599 }
600 try {
601 Thread.sleep(100);
602 } catch (InterruptedException e1) {
603
604 }
605 }
606
607
608
609
610
611 public void setReadTimeout(long theReadTimeout) {
612 myReadTimeout = theReadTimeout;
613 }
614
615
616
617
618
619 public void setResponseName(String theResponseName) {
620 myResponseName = theResponseName;
621 }
622
623
624
625
626
627 public void setResponseStatus(Integer theResponseStatus) {
628 myResponseStatus = theResponseStatus;
629 }
630
631 }