1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package ca.uhn.hl7v2.app;
29
30 import java.io.IOException;
31 import java.net.SocketException;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import ca.uhn.hl7v2.concurrent.Service;
37 import ca.uhn.hl7v2.llp.HL7Reader;
38 import ca.uhn.hl7v2.llp.LLPException;
39
40
41
42
43
44
45
46 public class Receiver extends Service {
47
48 private static final Logger log = LoggerFactory.getLogger(Receiver.class);
49
50 private final ActiveConnection conn;
51 private final HL7Reader in;
52 private ReceiverParserExceptionHandler parserExeptionHandler;
53
54
55 public Receiver(ActiveConnection c, HL7Reader in) {
56 super("Receiver", c.getExecutorService());
57 this.conn = c;
58 this.in = in;
59 }
60
61 public void setParserExeptionHandler(ReceiverParserExceptionHandler parserExeptionHandler) {
62 this.parserExeptionHandler = parserExeptionHandler;
63 }
64
65 @Override
66 protected void handle() {
67 try {
68 String message = in.getMessage();
69 if (message == null) {
70 log.debug("Failed to read a message");
71 } else {
72 processMessage(message);
73 }
74 } catch (LLPException e) {
75
76
77 conn.close();
78 log.info("LLPException: closing Connection from " + describeRemoteConnection() + ", will no longer read messages with this Receiver: " + e.getMessage());
79 if(parserExeptionHandler!=null) {
80 parserExeptionHandler.handle(e);
81 }
82 } catch (SocketException e) {
83
84 conn.close();
85 log.info("SocketException: closing Connection from " + describeRemoteConnection() + ", will no longer read messages with this Receiver: " + e.getMessage());
86 } catch (IOException e) {
87 conn.close();
88 log.warn("IOException: closing Connection from " + describeRemoteConnection() + ", will no longer read messages with this Receiver. ", e);
89 } catch (Exception e) {
90 conn.close();
91 log.error("Unexpected error, closing connection from " + describeRemoteConnection() + " - ", e);
92 }
93
94 }
95
96
97 private String describeRemoteConnection() {
98 return conn.getRemoteAddress().getHostAddress() + ":" + conn.getRemotePort();
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113 protected void processMessage(String message) {
114 String ackID = conn.getParser().getAckID(message);
115 if (ackID == null) {
116 log.debug("Unsolicited Message Received: {}", message);
117 getExecutorService().submit(new Grunt(conn, message));
118 } else {
119 if ( conn.acceptAllMessages() ){
120 getExecutorService().submit(new Grunt(conn, message));
121 }else if (!conn.isRecipientWaiting(ackID, message)) {
122 log.info("Unexpected Message Received. This message appears to be an acknowledgement (MSA-2 has a value) so it will be ignored: {}", message);
123 } else {
124 log.debug("Response Message Received: {}", message);
125 }
126 }
127 }
128
129
130 private static class Grunt implements Runnable {
131
132 private final ActiveConnection conn;
133 private final String m;
134
135 public Grunt(ActiveConnection conn, String message) {
136 this.conn = conn;
137 this.m = message;
138 }
139
140 public void run() {
141 try {
142 String response = conn.getResponder().processMessage(m);
143 if (response != null) {
144 conn.getAckWriter().writeMessage(response);
145 } else {
146 log.debug("Not responding to incoming message");
147 }
148 } catch (Exception e) {
149 log.error("Error while processing message: ", e);
150 }
151 }
152 }
153
154
155
156
157 public static interface ReceiverParserExceptionHandler {
158 void handle(Exception e);
159 }
160 }