1 package ca.uhn.hl7v2.hoh.relay.listener;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.ThreadFactory;
7 import java.util.concurrent.atomic.AtomicInteger;
8
9 import ca.uhn.hl7v2.model.Message;
10 import org.springframework.beans.factory.BeanNameAware;
11 import org.springframework.beans.factory.DisposableBean;
12 import org.springframework.beans.factory.InitializingBean;
13
14 import ca.uhn.hl7v2.DefaultHapiContext;
15 import ca.uhn.hl7v2.app.SimpleServer;
16 import ca.uhn.hl7v2.hoh.util.Validate;
17 import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
18 import ca.uhn.hl7v2.parser.GenericModelClassFactory;
19 import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
20 import ca.uhn.hl7v2.protocol.ReceivingApplication;
21 import ca.uhn.hl7v2.util.StandardSocketFactory;
22
23 public class RelayMllpListener implements InitializingBean, DisposableBean, IRelayListener, BeanNameAware {
24
25 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(RelayMllpListener.class);
26 private final List<ReceivingApplication<? extends Message>> myApplications = new ArrayList<>();
27 private final List<AppRoutingData> myAppRoutingData = new ArrayList<>();
28 private String myBeanName;
29 private int myPort;
30 private SimpleServer myServer;
31 private final AtomicInteger threadNum = new AtomicInteger(1);
32 private DefaultHapiContext myContext;
33
34
35
36
37
38 public void afterPropertiesSet() throws Exception {
39 if (myPort <= 0) {
40 throw new IllegalStateException("Port not set");
41 }
42
43 myContext = new DefaultHapiContext();
44 StandardSocketFactory socketFactory = new StandardSocketFactory();
45 socketFactory.setAcceptedSocketTimeout(2000);
46 myContext.setSocketFactory(socketFactory);
47 myContext.setExecutorService(Executors.newCachedThreadPool(new MyThreadFactory()));
48 myContext.setLowerLayerProtocol(new MinLowerLayerProtocol(true));
49 myContext.setModelClassFactory(new GenericModelClassFactory());
50 myServer = myContext.newServer(myPort, false);
51
52 for (int i = 0; i < myAppRoutingData.size(); i++) {
53 myServer.registerApplication(myAppRoutingData.get(i), myApplications.get(i));
54 }
55
56 ourLog.info("Starting listener on port {}", myPort);
57 myServer.startAndWait();
58 ourLog.info("Listener on port {} has started, and is ready for processing", myPort);
59
60 if (myServer.getServiceExitedWithException() != null) {
61 Throwable ex = myServer.getServiceExitedWithException();
62 ourLog.error("Server failed to start", ex);
63 if (ex instanceof Exception) {
64 throw (Exception) ex;
65 } else {
66 throw new Error(ex);
67 }
68 }
69
70 }
71
72
73
74
75
76
77 public void destroy() throws Exception {
78 ourLog.info("Stopping listener on port {}", myPort);
79 myServer.stopAndWait();
80 ourLog.info("Listener on port {} has stopped", myPort);
81
82 ourLog.info("Closing HAPI Context Object");
83 myContext.close();
84 ourLog.info("Done closing HAPI Context object");
85 }
86
87 public String getBeanName() {
88 return myBeanName;
89 }
90
91
92 public void registerApplication(AppRoutingData theAppRouting, ReceivingApplication<? extends Message> theReceivingApplication) {
93 Validate.notNull(theAppRouting, "appRouting");
94 Validate.notNull(theReceivingApplication, "receivingApplication");
95
96 if (myServer != null) {
97 myServer.registerApplication(theAppRouting, theReceivingApplication);
98 } else {
99 myAppRoutingData.add(theAppRouting);
100 myApplications.add(theReceivingApplication);
101 }
102 }
103
104
105 public void setBeanName(String theBeanName) {
106 myBeanName = theBeanName;
107 }
108
109 public void setPort(int thePort) {
110 myPort = thePort;
111 }
112
113 private class MyThreadFactory implements ThreadFactory {
114
115 private final ThreadGroup group;
116
117 private MyThreadFactory() {
118 group = Thread.currentThread().getThreadGroup();
119 }
120
121 public Thread newThread(Runnable theR) {
122 String name = "hoh-port-" + myPort + "-worker-" + threadNum.getAndIncrement();
123 Thread t = new Thread(group, theR, name, 0);
124 if (t.isDaemon()) {
125 t.setDaemon(false);
126 }
127 if (t.getPriority() != Thread.NORM_PRIORITY) {
128 t.setPriority(Thread.NORM_PRIORITY);
129 }
130 return t;
131 }
132
133 }
134
135
136 }