001package ca.uhn.hl7v2.hoh.relay.listener;
002
003import java.util.ArrayList;
004import java.util.List;
005import java.util.concurrent.Executors;
006import java.util.concurrent.ThreadFactory;
007import java.util.concurrent.atomic.AtomicInteger;
008
009import ca.uhn.hl7v2.model.Message;
010import org.springframework.beans.factory.BeanNameAware;
011import org.springframework.beans.factory.DisposableBean;
012import org.springframework.beans.factory.InitializingBean;
013
014import ca.uhn.hl7v2.DefaultHapiContext;
015import ca.uhn.hl7v2.app.SimpleServer;
016import ca.uhn.hl7v2.hoh.util.Validate;
017import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
018import ca.uhn.hl7v2.parser.GenericModelClassFactory;
019import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
020import ca.uhn.hl7v2.protocol.ReceivingApplication;
021import ca.uhn.hl7v2.util.StandardSocketFactory;
022
023public class RelayMllpListener implements InitializingBean, DisposableBean, IRelayListener, BeanNameAware {
024
025        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(RelayMllpListener.class);
026        private List<ReceivingApplication<? extends Message>> myApplications = new ArrayList<ReceivingApplication<? extends Message>>();
027        private List<AppRoutingData> myAppRoutingData = new ArrayList<AppRoutingData>();
028        private String myBeanName;
029        private int myPort;
030        private SimpleServer myServer;
031        private AtomicInteger threadNum = new AtomicInteger(1);
032        private DefaultHapiContext myContext;
033
034        /**
035         * Fired automatically by the container when
036         * the bean is ready to start
037         */
038        public void afterPropertiesSet() throws Exception {
039                if (myPort <= 0) {
040                        throw new IllegalStateException("Port not set");
041                }
042                
043                myContext = new DefaultHapiContext();
044                StandardSocketFactory socketFactory = new StandardSocketFactory();
045                socketFactory.setAcceptedSocketTimeout(2000);
046                myContext.setSocketFactory(socketFactory);
047                myContext.setExecutorService(Executors.newCachedThreadPool(new MyThreadFactory()));
048                myContext.setLowerLayerProtocol(new MinLowerLayerProtocol(true));
049                myContext.setModelClassFactory(new GenericModelClassFactory());
050                myServer = myContext.newServer(myPort, false);
051
052                for (int i = 0; i < myAppRoutingData.size(); i++) {
053                        myServer.registerApplication(myAppRoutingData.get(i), myApplications.get(i));
054                }
055                
056                ourLog.info("Starting listener on port {}", myPort);
057                myServer.startAndWait();
058                ourLog.info("Listener on port {} has started, and is ready for processing", myPort);
059
060                if (myServer.getServiceExitedWithException() != null) {
061                        Throwable ex = myServer.getServiceExitedWithException();
062                        ourLog.error("Server failed to start", ex);
063                        if (ex instanceof Exception) {
064                                throw (Exception) ex;
065                        } else {
066                                throw new Error(ex);
067                        }
068                }
069
070        }
071
072        
073        /**
074         * Fired automatically by the container when
075         * the bean is shutting down
076         */
077        public void destroy() throws Exception {
078                ourLog.info("Stopping listener on port {}", myPort);
079                myServer.stopAndWait();
080                ourLog.info("Listener on port {} has stopped", myPort);
081                
082                ourLog.info("Closing HAPI Context Object");
083                myContext.close();
084                ourLog.info("Done closing HAPI Context object");
085        }
086
087        public String getBeanName() {
088                return myBeanName;
089        }
090
091
092        public void registerApplication(AppRoutingData theAppRouting, ReceivingApplication<? extends Message> theReceivingApplication) {
093                Validate.notNull(theAppRouting, "appRouting");
094                Validate.notNull(theReceivingApplication, "receivingApplication");
095                
096                if (myServer != null) {
097                        myServer.registerApplication(theAppRouting, theReceivingApplication);
098                } else {
099                        myAppRoutingData.add(theAppRouting);
100                        myApplications.add(theReceivingApplication);
101                }
102        }
103
104
105        public void setBeanName(String theBeanName) {
106                myBeanName = theBeanName;
107        }
108
109        public void setPort(int thePort) {
110                myPort = thePort;
111        }
112
113        private class MyThreadFactory implements ThreadFactory {
114
115                private ThreadGroup group;
116
117                private MyThreadFactory() {
118                        group = Thread.currentThread().getThreadGroup();
119                }
120
121                public Thread newThread(Runnable theR) {
122                        String name = "hoh-port-" + myPort + "-worker-" + threadNum.getAndIncrement();
123                        Thread t = new Thread(group, theR, name, 0);
124                        if (t.isDaemon()) {
125                                t.setDaemon(false);
126                        }
127                        if (t.getPriority() != Thread.NORM_PRIORITY) {
128                                t.setPriority(Thread.NORM_PRIORITY);
129                        }
130                        return t;
131                }
132
133        }
134
135
136}