Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
HL7Service |
|
| 2.0833333333333335;2.083 | ||||
HL7Service$ConnectionCleaner |
|
| 2.0833333333333335;2.083 |
1 | /** | |
2 | The contents of this file are subject to the Mozilla Public License Version 1.1 | |
3 | (the "License"); you may not use this file except in compliance with the License. | |
4 | You may obtain a copy of the License at http://www.mozilla.org/MPL/ | |
5 | Software distributed under the License is distributed on an "AS IS" basis, | |
6 | WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the | |
7 | specific language governing rights and limitations under the License. | |
8 | ||
9 | The Original Code is "HL7Service.java". Description: | |
10 | "Accepts incoming TCP/IP connections and creates Connection objects" | |
11 | ||
12 | The Initial Developer of the Original Code is University Health Network. Copyright (C) | |
13 | 2001. All Rights Reserved. | |
14 | ||
15 | Contributor(s): Kyle Buza | |
16 | ||
17 | Alternatively, the contents of this file may be used under the terms of the | |
18 | GNU General Public License (the �GPL�), in which case the provisions of the GPL are | |
19 | applicable instead of those above. If you wish to allow use of your version of this | |
20 | file only under the terms of the GPL and not to allow others to use your version | |
21 | of this file under the MPL, indicate your decision by deleting the provisions above | |
22 | and replace them with the notice and other provisions required by the GPL License. | |
23 | If you do not delete the provisions above, a recipient may use your version of | |
24 | this file under either the MPL or the GPL. | |
25 | ||
26 | */ | |
27 | ||
28 | package ca.uhn.hl7v2.app; | |
29 | ||
30 | import ca.uhn.hl7v2.HL7Exception; | |
31 | import ca.uhn.hl7v2.HapiContext; | |
32 | import ca.uhn.hl7v2.concurrent.DefaultExecutorService; | |
33 | import ca.uhn.hl7v2.concurrent.Service; | |
34 | import ca.uhn.hl7v2.llp.LowerLayerProtocol; | |
35 | import ca.uhn.hl7v2.model.Message; | |
36 | import ca.uhn.hl7v2.parser.Parser; | |
37 | import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData; | |
38 | import ca.uhn.hl7v2.protocol.ReceivingApplication; | |
39 | import ca.uhn.hl7v2.protocol.ReceivingApplicationExceptionHandler; | |
40 | import ca.uhn.hl7v2.protocol.impl.AppRoutingDataImpl; | |
41 | import ca.uhn.hl7v2.protocol.impl.AppWrapper; | |
42 | import ca.uhn.hl7v2.protocol.impl.ApplicationRouterImpl; | |
43 | import org.slf4j.Logger; | |
44 | import org.slf4j.LoggerFactory; | |
45 | ||
46 | import java.io.BufferedReader; | |
47 | import java.io.File; | |
48 | import java.io.FileReader; | |
49 | import java.io.IOException; | |
50 | import java.util.*; | |
51 | import java.util.concurrent.ExecutorService; | |
52 | ||
53 | /** | |
54 | * <p> | |
55 | * An HL7 service. Accepts incoming TCP/IP connections and creates Connection | |
56 | * objects. Uses a single ApplicationRouter object (for all Connections) to | |
57 | * define the Applications to which message are sent. To configure, use | |
58 | * registerApplication() or loadApplicationsFromFile(). | |
59 | * </p> | |
60 | * </p>A separate thread looks for Connections that have been closed (locally or | |
61 | * remotely) and discards them. </p> | |
62 | * | |
63 | * @author Bryan Tripp | |
64 | * @author Christian Ohr | |
65 | */ | |
66 | 464 | public abstract class HL7Service extends Service { |
67 | ||
68 | 5 | private static final Logger log = LoggerFactory.getLogger(HL7Service.class); |
69 | ||
70 | private final List<Connection> connections; | |
71 | private final Parser parser; | |
72 | private final LowerLayerProtocol llp; | |
73 | private final List<ConnectionListener> listeners; | |
74 | private final ConnectionCleaner cleaner; | |
75 | private final ApplicationRouterImpl applicationRouter; | |
76 | ||
77 | /** | |
78 | * @param theHapiContext HapiContext | |
79 | */ | |
80 | public HL7Service(HapiContext theHapiContext) { | |
81 | 115 | this(theHapiContext.getGenericParser(), theHapiContext.getLowerLayerProtocol(), theHapiContext.getExecutorService()); |
82 | 115 | } |
83 | ||
84 | /** | |
85 | * @param parser parser to be used | |
86 | * @param llp LowerLayerProtocol | |
87 | */ | |
88 | public HL7Service(final Parser parser, final LowerLayerProtocol llp) { | |
89 | 0 | this(parser, llp, DefaultExecutorService.getDefaultService()); |
90 | 0 | } |
91 | ||
92 | /** | |
93 | * Creates a new instance of Server | |
94 | * | |
95 | * @param parser parser to be used | |
96 | * @param llp LowerLayerProtocol | |
97 | * @param executorService executor used for starting threads | |
98 | */ | |
99 | public HL7Service(Parser parser, LowerLayerProtocol llp, | |
100 | ExecutorService executorService) { | |
101 | 145 | super("HL7 Server", executorService); |
102 | 145 | this.connections = new ArrayList<Connection>(); |
103 | 145 | this.listeners = new ArrayList<ConnectionListener>(); |
104 | 145 | this.parser = parser; |
105 | 145 | this.llp = llp; |
106 | 145 | this.applicationRouter = new ApplicationRouterImpl(parser); |
107 | 145 | this.cleaner = new ConnectionCleaner(this); |
108 | ||
109 | // 960101 | |
110 | 145 | assert !this.cleaner.isRunning(); |
111 | 145 | } |
112 | ||
113 | /** | |
114 | * Called after startup before the thread enters its main loop. This | |
115 | * implementation launches a cleaner thread that removes stale connections | |
116 | * from the connection list. Override to initialize resources for the | |
117 | * running thread, e.g. opening {@link java.net.ServerSocket}s etc. | |
118 | */ | |
119 | @Override | |
120 | protected void afterStartup() { | |
121 | // Fix for bug 960101: Don't start the cleaner thread until the | |
122 | // server is started. | |
123 | 135 | cleaner.start(); |
124 | 135 | } |
125 | ||
126 | /** | |
127 | * Called after the thread has left its main loop. This implementation stops | |
128 | * the connection cleaner thread and closes any open connections. Override | |
129 | * to clean up additional resources from the running thread, e.g. closing | |
130 | * {@link java.net.ServerSocket}s. | |
131 | */ | |
132 | @Override | |
133 | protected void afterTermination() { | |
134 | 105 | super.afterTermination(); |
135 | 105 | cleaner.stopAndWait(); |
136 | 105 | for (Connection c : connections) { |
137 | 62 | c.close(); |
138 | 62 | } |
139 | 105 | } |
140 | ||
141 | /** | |
142 | * Returns true if the thread should continue to run, false otherwise (ie if | |
143 | * stop() has been called). | |
144 | * | |
145 | * @deprecated Use {@link #isRunning()}. Deprecated as of version 0.6. | |
146 | */ | |
147 | protected boolean keepRunning() { | |
148 | 0 | return isRunning(); |
149 | } | |
150 | ||
151 | LowerLayerProtocol getLlp() { | |
152 | 234 | return llp; |
153 | } | |
154 | ||
155 | Parser getParser() { | |
156 | 234 | return parser; |
157 | } | |
158 | ||
159 | /** | |
160 | * Called by subclasses when a new Connection is made. Registers the | |
161 | * ApplicationRouter with the given Connection and stores it. | |
162 | * | |
163 | * @param c existing connection | |
164 | */ | |
165 | public synchronized void newConnection(ActiveConnection c) { | |
166 | 234 | c.getResponder().setApplicationRouter(applicationRouter); |
167 | 234 | c.activate(); |
168 | 234 | connections.add(c); // keep track of connections |
169 | 234 | notifyListeners(c); |
170 | 234 | } |
171 | ||
172 | /** | |
173 | * Returns a connection to a remote host that was initiated by the given | |
174 | * remote host. If the connection has not been made, this method blocks | |
175 | * until the remote host connects. | |
176 | * | |
177 | * @param ipAddress IP Address | |
178 | * @return connection that was initiated by the given address | |
179 | */ | |
180 | public Connection getRemoteConnection(String ipAddress) { | |
181 | 0 | Connection conn = null; |
182 | 0 | while (conn == null) { |
183 | // check all connections ... | |
184 | 0 | int c = 0; |
185 | 0 | synchronized (this) { |
186 | 0 | while (conn == null && c < connections.size()) { |
187 | 0 | Connection nextConn = connections.get(c); |
188 | 0 | if (nextConn.getRemoteAddress().getHostAddress().equals(ipAddress)) |
189 | 0 | conn = nextConn; |
190 | 0 | c++; |
191 | 0 | } |
192 | 0 | } |
193 | ||
194 | 0 | if (conn == null) { |
195 | try { | |
196 | 0 | Thread.sleep(100); |
197 | 0 | } catch (InterruptedException e) { |
198 | // don't care | |
199 | 0 | } |
200 | } | |
201 | 0 | } |
202 | 0 | return conn; |
203 | } | |
204 | ||
205 | /** | |
206 | * Returns all currently active connections. | |
207 | * | |
208 | * @return list of active remote connections | |
209 | */ | |
210 | public synchronized List<Connection> getRemoteConnections() { | |
211 | 3325 | return connections; |
212 | } | |
213 | ||
214 | /** | |
215 | * Registers the given ConnectionListener with the HL7Service - when a | |
216 | * remote host makes a new Connection, all registered listeners will be | |
217 | * notified. | |
218 | * | |
219 | * @param listener connection listener to be called | |
220 | */ | |
221 | public synchronized void registerConnectionListener( | |
222 | ConnectionListener listener) { | |
223 | 5 | listeners.add(listener); |
224 | 5 | } |
225 | ||
226 | /** | |
227 | * Notifies all listeners that a Connection is new or discarded. | |
228 | */ | |
229 | private void notifyListeners(Connection c) { | |
230 | 396 | for (ConnectionListener cl : listeners) { |
231 | 10 | if (c.isOpen()) { |
232 | 5 | cl.connectionReceived(c); |
233 | } else { | |
234 | 5 | cl.connectionDiscarded(c); |
235 | } | |
236 | 10 | } |
237 | 396 | } |
238 | ||
239 | /** | |
240 | * Registers the given application to handle messages corresponding to the | |
241 | * given type and trigger event. Only one application can be registered for | |
242 | * a given message type and trigger event combination. A repeated | |
243 | * registration for a particular combination of type and trigger event | |
244 | * over-writes the previous one. Note that the wildcard "*" for messageType | |
245 | * or triggerEvent means any type or event, respectively. | |
246 | * | |
247 | * @deprecated use {@link #registerApplication(String, String, ca.uhn.hl7v2.protocol.ReceivingApplication)} and | |
248 | * {@link ca.uhn.hl7v2.protocol.impl.AppWrapper} | |
249 | */ | |
250 | public synchronized void registerApplication(String messageType, | |
251 | String triggerEvent, Application handler) { | |
252 | 0 | ReceivingApplication<Message> handlerWrapper = new AppWrapper(handler); |
253 | 0 | applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handlerWrapper); |
254 | 0 | } |
255 | ||
256 | /** | |
257 | * Registers the given application to handle messages corresponding to the | |
258 | * given type and trigger event. Only one application can be registered for | |
259 | * a given message type and trigger event combination. A repeated | |
260 | * registration for a particular combination of type and trigger event | |
261 | * over-writes the previous one. Note that the wildcard "*" for messageType | |
262 | * or triggerEvent means any type or event, respectively. | |
263 | */ | |
264 | public void registerApplication(String messageType, String triggerEvent, ReceivingApplication handler) { | |
265 | 85 | applicationRouter.bindApplication(new AppRoutingDataImpl(messageType, triggerEvent, "*", "*"), handler); |
266 | 85 | } |
267 | ||
268 | /** | |
269 | * Registers the given application to handle messages corresponding to ALL | |
270 | * message types and trigger events. | |
271 | */ | |
272 | public synchronized void registerApplication(AppRoutingData appRouting, ReceivingApplication<? extends Message> application) { | |
273 | 30 | if (appRouting == null) { |
274 | 0 | throw new NullPointerException("appRouting must not be null"); |
275 | } | |
276 | 30 | applicationRouter.bindApplication(appRouting, application); |
277 | 30 | } |
278 | ||
279 | /** | |
280 | * Registers the given application to handle messages corresponding to ALL | |
281 | * message types and trigger events. | |
282 | */ | |
283 | public synchronized void registerApplication(ReceivingApplication<? extends Message> application) { | |
284 | ||
285 | 30 | registerApplication(new AppRoutingDataImpl("*", "*", "*", "*"), application); |
286 | 30 | } |
287 | ||
288 | /** | |
289 | * Unregisteres the first application that matches the routing data | |
290 | * | |
291 | * @param appRouting | |
292 | * @return true if an application was unregistered, false otherwise | |
293 | */ | |
294 | public synchronized boolean unregisterApplication(AppRoutingData appRouting) { | |
295 | 0 | if (appRouting == null) { |
296 | 0 | throw new NullPointerException("appRouting must not be null"); |
297 | } | |
298 | 0 | return applicationRouter.unbindApplication(appRouting); |
299 | } | |
300 | ||
301 | /** | |
302 | * Unregisteres the passed application | |
303 | * | |
304 | * @param application receiving application | |
305 | * @return true if an application was unregistered, false otherwise | |
306 | */ | |
307 | public synchronized boolean unregisterApplication(ReceivingApplication<? extends Message> application) { | |
308 | 0 | if (application == null) { |
309 | 0 | throw new NullPointerException("application must not be null"); |
310 | } | |
311 | 0 | return applicationRouter.unbindApplication(application); |
312 | } | |
313 | ||
314 | /** | |
315 | * Sets an exception handler which will be invoked in the event of a | |
316 | * failure during parsing, processing, or encoding of an | |
317 | * incoming message or its response. | |
318 | */ | |
319 | public synchronized void setExceptionHandler(ReceivingApplicationExceptionHandler exHandler) { | |
320 | 0 | applicationRouter.setExceptionHandler(exHandler); |
321 | 0 | } |
322 | ||
323 | ||
324 | /** | |
325 | * <p> | |
326 | * A convenience method for registering applications (using | |
327 | * <code>registerApplication() | |
328 | * </code>) with this service. Information about which Applications should | |
329 | * handle which messages is read from the given text file. Each line in the | |
330 | * file should have the following format (entries tab delimited): | |
331 | * </p> | |
332 | * <p> | |
333 | * message_type 	 trigger_event 	 application_class | |
334 | * </p> | |
335 | * <p> | |
336 | * message_type 	 trigger_event 	 application_class | |
337 | * </p> | |
338 | * <p> | |
339 | * Note that message type and event can be the wildcard "*", which means | |
340 | * any. | |
341 | * </p> | |
342 | * <p> | |
343 | * For example, if you write an Application called | |
344 | * org.yourorganiztion.ADTProcessor that processes several types of ADT | |
345 | * messages, and another called org.yourorganization.ResultProcessor that | |
346 | * processes result messages, you might have a file that looks like this: | |
347 | * </p> | |
348 | * <p> | |
349 | * ADT 	 * 	 org.yourorganization.ADTProcessor<br> | |
350 | * ORU 	 R01 	 org.yourorganization.ResultProcessor | |
351 | * </p> | |
352 | * <p> | |
353 | * Each class listed in this file must implement Application and must have a | |
354 | * zero-argument constructor. | |
355 | * </p> | |
356 | */ | |
357 | public void loadApplicationsFromFile(File f) throws IOException, | |
358 | HL7Exception, ClassNotFoundException, InstantiationException, | |
359 | IllegalAccessException { | |
360 | 0 | BufferedReader in = null; |
361 | try { | |
362 | 0 | in = new BufferedReader(new FileReader(f)); |
363 | String line; | |
364 | 0 | while ((line = in.readLine()) != null) { |
365 | // parse application registration information | |
366 | 0 | StringTokenizer tok = new StringTokenizer(line, "\t", false); |
367 | String type, event, className; | |
368 | ||
369 | 0 | if (tok.hasMoreTokens()) { // skip blank lines |
370 | try { | |
371 | 0 | type = tok.nextToken(); |
372 | 0 | event = tok.nextToken(); |
373 | 0 | className = tok.nextToken(); |
374 | 0 | } catch (NoSuchElementException ne) { |
375 | 0 | throw new HL7Exception( |
376 | "Can't register applications from file " | |
377 | 0 | + f.getName() |
378 | + ". The line '" | |
379 | + line | |
380 | + "' is not of the form: message_type [tab] trigger_event [tab] application_class."); | |
381 | 0 | } |
382 | ||
383 | try { | |
384 | @SuppressWarnings("unchecked") | |
385 | 0 | Class<? extends Application> appClass = (Class<? extends Application>) Class |
386 | 0 | .forName(className); // may throw |
387 | // ClassNotFoundException | |
388 | 0 | Application app = appClass.newInstance(); |
389 | 0 | registerApplication(type, event, new AppWrapper(app)); |
390 | 0 | } catch (ClassCastException cce) { |
391 | 0 | throw new HL7Exception("The specified class, " + className |
392 | + ", doesn't implement Application."); | |
393 | 0 | } |
394 | ||
395 | } | |
396 | 0 | } |
397 | } finally { | |
398 | 0 | if (in != null) { |
399 | try { | |
400 | 0 | in.close(); |
401 | 0 | } catch (IOException e) { |
402 | // don't care | |
403 | 0 | } |
404 | } | |
405 | } | |
406 | 0 | } |
407 | ||
408 | /** | |
409 | * Runnable that looks for closed Connections and discards them. It would be | |
410 | * nice to find a way to externalize this safely so that it could be re-used | |
411 | * by (for example) TestPanel. It could take a Vector of Connections as an | |
412 | * argument, instead of an HL7Service, but some problems might arise if | |
413 | * other threads were iterating through the Vector while this one was | |
414 | * removing elements from it. | |
415 | * <p/> | |
416 | * Note: this could be started as daemon, so we don't need to care about | |
417 | * termination. | |
418 | */ | |
419 | private static class ConnectionCleaner extends Service { | |
420 | ||
421 | private final HL7Service service; | |
422 | ||
423 | public ConnectionCleaner(HL7Service service) { | |
424 | 145 | super("ConnectionCleaner", service.getExecutorService()); |
425 | 145 | this.service = service; |
426 | 145 | } |
427 | ||
428 | @Override | |
429 | public void start() { | |
430 | 135 | log.info("Starting ConnectionCleaner service"); |
431 | 135 | super.start(); |
432 | 135 | } |
433 | ||
434 | public void handle() { | |
435 | try { | |
436 | 3378 | Thread.sleep(500); |
437 | 3305 | synchronized (service) { |
438 | 3305 | Iterator<Connection> it = service.getRemoteConnections() |
439 | 3305 | .iterator(); |
440 | 3657 | while (it.hasNext()) { |
441 | 352 | Connection conn = it.next(); |
442 | 352 | if (!conn.isOpen()) { |
443 | 324 | log.debug( |
444 | "Removing connection from {} from connection list", | |
445 | 162 | conn.getRemoteAddress().getHostAddress()); |
446 | 162 | it.remove(); |
447 | 162 | service.notifyListeners(conn); |
448 | } | |
449 | 352 | } |
450 | 3305 | } |
451 | 43 | } catch (InterruptedException e) { |
452 | // don't care | |
453 | 3305 | } |
454 | 3348 | } |
455 | ||
456 | } | |
457 | ||
458 | } |