001package ca.uhn.hl7v2.hoh.encoder;
002
003import java.io.ByteArrayOutputStream;
004import java.io.IOException;
005import java.io.InputStream;
006import java.net.SocketException;
007import java.net.SocketTimeoutException;
008import java.nio.charset.Charset;
009import java.nio.charset.UnsupportedCharsetException;
010import java.util.ArrayList;
011import java.util.LinkedHashMap;
012import java.util.List;
013import java.util.Map;
014import java.util.regex.Pattern;
015
016import ca.uhn.hl7v2.hoh.api.DecodeException;
017import ca.uhn.hl7v2.hoh.api.NonHl7ResponseException;
018import ca.uhn.hl7v2.hoh.sign.SignatureFailureException;
019import ca.uhn.hl7v2.hoh.sign.SignatureVerificationException;
020import ca.uhn.hl7v2.hoh.util.ByteUtils;
021import ca.uhn.hl7v2.hoh.util.GZipUtils;
022import ca.uhn.hl7v2.hoh.util.IOUtils;
023import ca.uhn.hl7v2.hoh.util.StringUtils;
024import ca.uhn.hl7v2.hoh.util.repackage.Base64;
025
026public abstract class AbstractHl7OverHttpDecoder extends AbstractHl7OverHttp {
027
028        private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+");
029
030        /**
031         * Default amount of time that the decoder will attempt to read before
032         * timing out and throwing an IOException (30000ms)
033         * 
034         * @see #setReadTimeout(long)
035         */
036        public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
037
038        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(AbstractHl7OverHttpDecoder.class);
039
040        private byte[] myBytes;
041        private List<String> myConformanceProblems;
042        private int myContentLength = -1;
043        private String myContentType;
044        private boolean myGzipCoding;
045        private long myLastStartedReading;
046        private long myReadTimeout = DEFAULT_READ_TIMEOUT;
047        private String myResponseName;
048        private Integer myResponseStatus;
049        private TransferEncoding myTransferEncoding;
050        private String mySignature;
051        private EncodingStyle myEncodingStyle;
052
053        private boolean myConnectionCloseHeaderIsPresent;
054
055        private void addConformanceProblem(String theString) {
056                ourLog.debug("Conformance problem detected: {}", theString);
057                if (myConformanceProblems == null) {
058                        myConformanceProblems = new ArrayList<String>();
059                }
060                myConformanceProblems.add(theString);
061        }
062
063        protected abstract void authorize() throws AuthorizationFailureException;
064
065        public void decode() throws DecodeException, SignatureVerificationException {
066                ourLog.trace("Entering decode()");
067                
068                verifyNotUsed();
069
070                decodeHeaders();
071                authorize();
072                decodeBody();
073                verifySignature();
074
075                ourLog.trace("Exiting decode()");
076        }
077
078        private void decodeBody() throws DecodeException {
079                byte[] bytes = myBytes;
080
081                if (myGzipCoding) {
082                        ourLog.debug("Decoding message contents using GZIP encoding style");
083                        try {
084                                bytes = GZipUtils.uncompress(bytes);
085                        } catch (IOException e) {
086                                throw new DecodeException("Failed to uncompress GZip content", e);
087                        }
088                }
089
090                Charset charset = getCharset();
091                
092                ourLog.debug("Message is {} bytes with charset {}", bytes.length, charset.name());
093                if (ourLog.isTraceEnabled()) {
094                        ourLog.trace("Raw message: {}", StringUtils.asciiEscape(bytes, charset));
095                }
096                
097                String messageString = new String(bytes, charset);
098                setMessage(messageString);
099        }
100
101        private void decodeHeaders() throws DecodeException {
102
103                ourLog.trace("Header map contains: {}", getHeaders());
104
105                for (Map.Entry<String, String> nextEntry : getHeaders().entrySet()) {
106                        String nextHeader = nextEntry.getKey().toLowerCase();
107                        String nextValue = nextEntry.getValue();
108
109                        ourLog.trace("Next header: {}={}", nextHeader, nextValue);
110                        
111                        if ("transfer-encoding".equals(nextHeader)) {
112                                if ("chunked".equalsIgnoreCase(nextValue)) {
113                                        myTransferEncoding = TransferEncoding.CHUNKED;
114                                        ourLog.trace("Found chunked transfer encoding");
115                                } else {
116                                        throw new DecodeException("Unknown transfer encoding: " + nextValue);
117                                }
118                        } else if ("connection".equals(nextHeader)) {
119                                if ("close".equals(nextValue)) {
120                                        myConnectionCloseHeaderIsPresent = true;
121                                }
122                        } else if ("content-length".equals(nextHeader)) {
123                                try {
124                                        myContentLength = Integer.parseInt(nextValue);
125                                        ourLog.trace("Found content length: {}", myContentLength);
126                                } catch (NumberFormatException e) {
127                                        addConformanceProblem("Could not parse Content-Length header value: " + nextHeader);
128                                }
129                        } else if ("content-type".equals(nextHeader)) {
130                                int colonIndex = nextValue.indexOf(';');
131                                if (colonIndex == -1) {
132                                        myContentType = nextValue.trim();
133                                } else {
134                                        myContentType = nextValue.substring(0, colonIndex).trim();
135                                        String charsetDef = nextValue.substring(colonIndex + 1).trim();
136                                        if (charsetDef.startsWith("charset=")) {
137                                                String charsetName = charsetDef.substring(8);
138                                                Charset charset;
139                                                try {
140                                                        charset = Charset.forName(charsetName);
141                                                } catch (UnsupportedCharsetException e) {
142                                                        addConformanceProblem("Unsupported or invalid charset: " + charsetName);
143                                                        continue;
144                                                }
145                                                setCharset(charset);
146                                        }
147                                }
148
149                                myEncodingStyle = EncodingStyle.getEncodingStyleForContentType(myContentType);
150                                ourLog.trace("Found content type {} with resolves to encoding style {}", myContentType, myEncodingStyle);
151
152                        } else if ("authorization".equals(nextHeader)) {
153                                int spaceIndex = nextValue.indexOf(' ');
154                                if (spaceIndex == -1) {
155                                        throw new DecodeException("Invalid authorization header. No authorization style detected");
156                                }
157                                String type = nextValue.substring(0, spaceIndex);
158                                if ("basic".equalsIgnoreCase(type)) {
159                                        String encodedCredentials = nextValue.substring(spaceIndex + 1);
160                                        byte[] decodedCredentials = Base64.decodeBase64(encodedCredentials);
161                                        String credentialsString = new String(decodedCredentials, getDefaultCharset());
162                                        int colonIndex = credentialsString.indexOf(':');
163                                        if (colonIndex == -1) {
164                                                setUsername(credentialsString);
165                                        } else {
166                                                setUsername(credentialsString.substring(0, colonIndex));
167                                                setPassword(credentialsString.substring(colonIndex + 1));
168                                        }
169                                        
170                                        ourLog.trace("Found authorization header with username: {}", getUsername());
171                                        
172                                } else {
173                                        addConformanceProblem("Invalid authorization type. Only basic authorization is supported.");
174                                }
175                                
176                        } else if ("content-encoding".equals(nextHeader)) {
177                                if (StringUtils.isNotBlank(nextValue)) {
178                                        if ("gzip".equals(nextValue)) {
179                                                myGzipCoding = true;
180                                        } else {
181                                                throw new DecodeException("Unknown Content-Encoding: " + nextValue);
182                                        }
183                                }
184                                ourLog.trace("Found content coding: {}", nextValue);
185                        } else if (HTTP_HEADER_HL7_SIGNATURE_LC.equals(nextHeader)) {
186                                ourLog.trace("Found signature: {}", nextValue);
187                                mySignature = nextValue;
188                        } else {
189                                ourLog.trace("Ignoring header {}={}", nextHeader, nextValue);
190                        }
191
192                }
193
194                ourLog.trace("Done processing headers");
195
196        }
197
198        /**
199         * Protected because this doesn't make sense for a sender
200         */
201        protected boolean isConnectionCloseHeaderPresent() {
202                return myConnectionCloseHeaderIsPresent;
203        }
204
205        /**
206         * Returns the {@link EncodingStyle} associated with the incoming message,
207         * or <code>null</code>. This will be set automatically based on the value
208         * of the <code>Content-Type</code> header, and will be set to
209         * <code>null</code> if the content type is not provided, or if the content
210         * type does not correspond to an HL7 type.
211         * 
212         * @see {@link EncodingStyle} for a list of appropriate content types
213         */
214        public EncodingStyle getEncodingStyle() {
215                return myEncodingStyle;
216        }
217
218        private void doReadContentsFromInputStreamAndDecode(InputStream theInputStream) throws DecodeException, AuthorizationFailureException, IOException, SignatureVerificationException {
219                decodeHeaders();
220                authorize();
221                if (myTransferEncoding == TransferEncoding.CHUNKED) {
222                        myBytes = readBytesChunked(theInputStream);
223                } else {
224                        myBytes = readBytesNonChunked(theInputStream);
225                }
226
227                decodeBody();
228                
229                if (getContentType() == null) {
230                        throw new DecodeException("Content-Type not specified");
231                }
232                if (getEncodingStyle() == null) {
233                        throw new NonHl7ResponseException("Invalid Content-Type: " + getContentType(), getContentType(), getMessage());
234                }
235                
236                verifySignature();
237        }
238
239        private byte[] readBytesChunked(InputStream theInputStream) throws DecodeException, IOException {
240                ourLog.debug("Decoding message bytes using CHUNKED encoding style");
241                byte[] byteBuffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
242                ByteArrayOutputStream bos = new ByteArrayOutputStream(IOUtils.DEFAULT_BUFFER_SIZE);
243
244                while (true) {
245                        String nextSize;
246                        try {
247                                nextSize = readLine(theInputStream);
248                        } catch (IOException e) {
249                                throw new DecodeException("Failed to decode CHUNKED encoding", e);
250                        }
251
252                        ourLog.trace("Going to interpret CHUNKED size value: {}", nextSize);
253                        
254                        if (nextSize.length() == 0) {
255                                break;
256                        }
257
258                        int nextSizeInt;
259                        try {
260                                nextSizeInt = Integer.parseInt(nextSize, 16);
261                        } catch (NumberFormatException e) {
262                                throw new DecodeException("Failed to decode CHUNKED encoding", e);
263                        }
264
265                        ourLog.debug("Next CHUNKED size: {}", nextSizeInt);
266
267                        if (nextSizeInt < 0) {
268                                throw new DecodeException("Received invalid octet count in chunked transfer encoding: " + nextSize);
269                        }
270
271                        boolean trailing = false;
272                        if (nextSizeInt > 0) {
273                                int totalRead = 0;
274                                myLastStartedReading = System.currentTimeMillis();
275                                do {
276                                        int nextRead = Math.min(nextSizeInt, byteBuffer.length);
277                                        int bytesRead = theInputStream.read(byteBuffer, 0, nextRead);
278                                        if (bytesRead == -1) {
279                                                ourLog.debug("Exception in readBytesChunked(InputStream): Reached EOF. Buffer has {} bytes", bos.size());
280                                                throw new DecodeException("Reached EOF while reading in message chunk");
281                                        }
282                                        if (bytesRead == 0 && totalRead < nextSizeInt) {
283                                                pauseDuringTimedOutRead();
284                                        }
285                                        totalRead += bytesRead;
286
287                                        if (ourLog.isTraceEnabled()) {
288                                                ourLog.trace("Read {} byte chunk: {}", bytesRead, new String(byteBuffer, 0, bytesRead));
289                                        }else {
290                                                ourLog.debug("Read {} byte chunk", bytesRead);
291                                        }
292                                        
293                                        bos.write(byteBuffer, 0, bytesRead);
294
295                                } while (totalRead < nextSizeInt);
296                        } else {
297                                trailing = true;
298                        }
299
300                        // Try to read a trailing CRLF
301                        int nextChar;
302                        boolean had13 = false;
303                        boolean had10 = false;
304                        while (true) {
305                                try {
306                                        nextChar = theInputStream.read();
307                                        if (ourLog.isTraceEnabled()) {
308                                                ourLog.trace("Read byte: " + (char)nextChar + " (" + nextChar + ")");
309                                        }
310                                } catch (SocketTimeoutException e) {
311                                        break;
312                                }
313
314                                if (nextChar == -1) {
315                                        break;
316                                } else if (nextChar == 13) {
317                                        if (had13) {
318                                                /* 
319                                                 * This is an attempt to be tolerant of people using the wrong
320                                                 * end of line sequence (it should be CRLF), as is the 
321                                                 * had10 below 
322                                                 */
323                                                trailing = true;
324                                        }
325                                        had13 = true;
326                                        continue;
327                                } else if (nextChar == 10) {
328                                        if (had10) {
329                                                trailing = true;
330                                        }
331                                        break;
332                                } else {
333                                        break;
334                                }
335                        }
336                        
337                        if (trailing) {
338                                break;
339                        }
340
341                } // while
342
343                return bos.toByteArray();
344        }
345
346        private void verifySignature() throws SignatureVerificationException, DecodeException {
347                if (getSigner() != null && StringUtils.isBlank(mySignature)) {
348                        String mode = (this instanceof Hl7OverHttpRequestDecoder) ? "request" : "response";
349                        throw new SignatureVerificationException("No HL7 Signature found in " + mode);
350                }
351                if (getSigner() != null) {
352                        try {
353                                getSigner().verify(myBytes, mySignature);
354                        } catch (SignatureFailureException e) {
355                                throw new DecodeException("Failed to verify signature due to an error (signature may possibly be valid, but verification failed)", e);
356                        }
357                }
358        }
359
360        public List<String> getConformanceProblems() {
361                if (myConformanceProblems == null) {
362                        myConformanceProblems = new ArrayList<String>();
363                }
364                return myConformanceProblems;
365        }
366
367        /**
368         * @return Returns the content type associated with the message (e.g. application/hl7-v2)
369         */
370        public String getContentType() {
371                return myContentType;
372        }
373
374        /**
375         * @return the responseName
376         */
377        public String getResponseName() {
378                return myResponseName;
379        }
380
381        /**
382         * @return the responseStatus
383         */
384        public Integer getResponseStatus() {
385                return myResponseStatus;
386        }
387
388        protected abstract String readActionLineAndDecode(InputStream theInputStream) throws IOException, NoMessageReceivedException, DecodeException;
389
390        private byte[] readBytesNonChunked(InputStream theInputStream) throws IOException {
391                ourLog.debug("Decoding message bytes using non-chunked encoding style");
392
393                int length = myContentLength > 0 ? myContentLength : IOUtils.DEFAULT_BUFFER_SIZE;
394                ByteArrayOutputStream bos = new ByteArrayOutputStream(length);
395
396                byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
397                myLastStartedReading = System.currentTimeMillis();
398                while ((myContentLength < 0 || bos.size() < myContentLength)) {
399                        if (myContentLength < 0) {
400                                try {
401                                        if (theInputStream.available() <= 0) {
402                                                ourLog.trace("No more bytes available");
403                                                break;
404                                        }
405                                } catch (IOException e) {
406                                        ourLog.debug("Received IOException while calling inputStream#available()", e);
407                                        throw e;
408                                }
409                        }
410
411                        int max;
412                        if (myContentLength > 0) {
413                                max = myContentLength - bos.size();
414                                max = Math.min(max, buffer.length);
415                        } else {
416                                max = buffer.length;
417                        }
418                        
419                        try {
420                                int bytesRead = theInputStream.read(buffer, 0, max);
421                                myLastStartedReading = System.currentTimeMillis();
422                                if (bytesRead == -1) {
423                                        ourLog.trace("Read end of stream");
424                                        break;
425                                } else {
426                                        if (ourLog.isTraceEnabled()) {
427                                                ourLog.trace("Read {} bytes from stream:\n{}", bytesRead, ByteUtils.formatBytesForLogging(bytesRead, 0, buffer));
428                                        }
429                                }
430                                bos.write(buffer, 0, bytesRead);
431                        } catch (SocketTimeoutException e) {
432                                long elapsed = System.currentTimeMillis() - myLastStartedReading;
433                                if (elapsed > myReadTimeout) {
434                                        throw e;
435                                } else {
436                                        ourLog.debug("Trying to read for {} / {}ms, going to keep trying", elapsed, myReadTimeout);
437                                        try {
438                                                Thread.sleep(100);
439                                        } catch (InterruptedException e1) {
440                                                // ignore
441                                        }
442                                }
443                        } catch (IOException e) {
444                                ourLog.debug("Received IOException while calling inputStream#available()", e);
445                                throw e;
446                        }
447                }
448
449                return bos.toByteArray();
450        }
451
452        /**
453         * Read in the contents of the raw message from the input stream and decode
454         * entire the message. This method assumes that the headers have been
455         * provided using {@link #setHeaders(LinkedHashMap)}
456         * 
457         * @param theInputStream
458         *            The inputstream to read the raw message from
459         * @throws AuthorizationFailureException
460         *             If the authorization check fails. This will only be thrown if
461         *             this decoder is decoding a request message, and an
462         *             authorization callback has been provided, and the
463         *             authorization fails.
464         * @throws DecodeException
465         *             If the message can not be decoded for any reason
466         * @throws IOException
467         *             If there is a failure while reading from the inputstream
468         * @throws SignatureVerificationException
469         *             If the signature verification fails. This will only occur if
470         *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
471         *             has been provided.
472         */
473        public void readContentsFromInputStreamAndDecode(InputStream theInputStream) throws AuthorizationFailureException, DecodeException, IOException, SignatureVerificationException {
474                verifyNotUsed();
475
476                doReadContentsFromInputStreamAndDecode(theInputStream);
477        }
478
479        protected String readFirstLine(InputStream theInputStream) throws IOException, NoMessageReceivedException {
480                ourLog.trace("Entering readFirstLine(InputStream) with IS: {}", theInputStream);
481                String retVal = readLine(theInputStream, true);
482                ourLog.trace("Exiting readFirstLine(InputStream) with result: {}", retVal);
483                return retVal;
484        }
485
486        /**
487         * Note that if {@link #setPath(String)} is called, this method will assume
488         * that the first line of the HTTP request has already been read from the
489         * input stream. If {@link #setHeaders(java.util.LinkedHashMap)} has been
490         * called, this method will assume that the HTTP headers have already been
491         * read from the input stream as well as the double-LF (ASCII-10) that
492         * proceeds the headers.
493         * 
494         * 
495         * @param theInputStream
496         *            The inputstream to read the raw message from
497         * @throws AuthorizationFailureException
498         *             If the authorization check fails. This will only be thrown if
499         *             this decoder is decoding a request message, and an
500         *             authorization callback has been provided, and the
501         *             authorization fails.
502         * @throws DecodeException
503         *             If the message can not be decoded for any reason
504         * @throws IOException
505         *             If there is a failure while reading from the inputstream
506         * @throws SignatureVerificationException
507         *             If the signature verification fails. This will only occur if
508         *             {@link #setSigner(ca.uhn.hl7v2.hoh.sign.ISigner) a signer}
509         *             has been provided.
510         */
511        public void readHeadersAndContentsFromInputStreamAndDecode(InputStream theInputStream) throws IOException, DecodeException, NoMessageReceivedException, SignatureVerificationException {
512                verifyNotUsed();
513
514                String actionLine = readActionLineAndDecode(theInputStream);
515
516                ourLog.debug("Read action line: {}", actionLine);
517
518                if (getHeaders() == null) {
519                        setHeaders(new LinkedHashMap<String, String>());
520
521                        while (true) {
522                                String nextLine = readLine(theInputStream);
523                                if (nextLine.length() == 0) {
524                                        break;
525                                }
526
527                                int colonIndex = nextLine.indexOf(':');
528                                if (colonIndex == -1) {
529                                        throw new DecodeException("Invalid HTTP header line detected. Value is: " + nextLine);
530                                }
531
532                                String key = nextLine.substring(0, colonIndex);
533                                String value = nextLine.substring(colonIndex + 1).trim();
534                                
535                                ourLog.debug("Read header {}={}", key,value);
536                                
537                                getHeaders().put(key, value);
538                        }
539                }
540
541                doReadContentsFromInputStreamAndDecode(theInputStream);
542
543        }
544
545        private String readLine(InputStream theInputStream) throws IOException {
546                try {
547                        return readLine(theInputStream, false);
548                } catch (NoMessageReceivedException e) {
549                        throw new Error("Threw a NoMessageReceivedException. This should not happen.", e);
550                }
551        }
552
553        private String readLine(InputStream theInputStream, boolean theFirstLine) throws IOException, NoMessageReceivedException {
554                
555                myLastStartedReading = System.currentTimeMillis();
556
557                StringBuilder retVal = new StringBuilder();
558                while (true) {
559
560                        int b;
561                        try {
562                                b = theInputStream.read();
563                                if (ourLog.isTraceEnabled()) {
564                                        ourLog.trace("Read byte: " + (char)b + " (" + b + ")");
565                                }
566                        } catch (SocketTimeoutException e) {
567                                if (retVal.length() == 0 && theFirstLine) {
568                                        ourLog.trace("No message received, aborting readLine(InputStream, boolean)");
569                                        throw new NoMessageReceivedException();
570                                }
571                                ourLog.trace("No message received in readLine(InputStream, boolean), going to wait and continue");
572                                pauseDuringTimedOutRead();
573                                continue;
574                        }
575
576                        if (b == 13) {
577                                continue;
578                        } else if (b == 10) {
579                                break;
580                        } else if (b == -1) {
581                                ourLog.debug("Current read line is: {}", retVal);
582                                ourLog.info("Read -1 from input stream, closing it");
583                                theInputStream.close();
584                                if (retVal.length() == 0) {
585                                        throw new SocketException("Received EOF from input stream");
586                                }
587                                break;
588                        } else if (b < ' ') {
589                                continue;
590                        } else {
591                                retVal.append((char) b);
592                        }
593                }
594
595                ourLog.debug("Current read line is: {}", retVal);
596
597                return WHITESPACE_PATTERN.matcher(retVal.toString()).replaceAll(" ").trim();
598        }
599
600        private void pauseDuringTimedOutRead() throws SocketTimeoutException {
601                long elapsed = System.currentTimeMillis() - myLastStartedReading;
602                if (elapsed > myReadTimeout) {
603                        ourLog.trace("Elapsed time of {} exceeds max {}, throwing SocketTimeoutException", elapsed, myReadTimeout);
604                        throw new SocketTimeoutException();
605                }
606                try {
607                        Thread.sleep(100);
608                } catch (InterruptedException e1) {
609                        // ignore
610                }
611        }
612
613        /**
614         * Sets the number of milliseconds that the decoder will attempt to read
615         * from an InputStream before timing out and throwing an exception
616         */
617        public void setReadTimeout(long theReadTimeout) {
618                myReadTimeout = theReadTimeout;
619        }
620
621        /**
622         * @param theResponseName
623         *            the responseName to set
624         */
625        public void setResponseName(String theResponseName) {
626                myResponseName = theResponseName;
627        }
628
629        /**
630         * @param theResponseStatus
631         *            the responseStatus to set
632         */
633        public void setResponseStatus(Integer theResponseStatus) {
634                myResponseStatus = theResponseStatus;
635        }
636
637}