View Javadoc
1   package ca.uhn.hl7v2.hoh.relay.listener;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   import java.util.concurrent.Executors;
6   import java.util.concurrent.ThreadFactory;
7   import java.util.concurrent.atomic.AtomicInteger;
8   
9   import ca.uhn.hl7v2.model.Message;
10  import org.springframework.beans.factory.BeanNameAware;
11  import org.springframework.beans.factory.DisposableBean;
12  import org.springframework.beans.factory.InitializingBean;
13  
14  import ca.uhn.hl7v2.DefaultHapiContext;
15  import ca.uhn.hl7v2.app.SimpleServer;
16  import ca.uhn.hl7v2.hoh.util.Validate;
17  import ca.uhn.hl7v2.llp.MinLowerLayerProtocol;
18  import ca.uhn.hl7v2.parser.GenericModelClassFactory;
19  import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData;
20  import ca.uhn.hl7v2.protocol.ReceivingApplication;
21  import ca.uhn.hl7v2.util.StandardSocketFactory;
22  
23  public class RelayMllpListener implements InitializingBean, DisposableBean, IRelayListener, BeanNameAware {
24  
25  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(RelayMllpListener.class);
26  	private final List<ReceivingApplication<? extends Message>> myApplications = new ArrayList<>();
27  	private final List<AppRoutingData> myAppRoutingData = new ArrayList<>();
28  	private String myBeanName;
29  	private int myPort;
30  	private SimpleServer myServer;
31  	private final AtomicInteger threadNum	= new AtomicInteger(1);
32  	private DefaultHapiContext myContext;
33  
34  	/**
35  	 * Fired automatically by the container when
36  	 * the bean is ready to start
37  	 */
38  	public void afterPropertiesSet() throws Exception {
39  		if (myPort <= 0) {
40  			throw new IllegalStateException("Port not set");
41  		}
42  		
43  		myContext = new DefaultHapiContext();
44  		StandardSocketFactory socketFactory = new StandardSocketFactory();
45  		socketFactory.setAcceptedSocketTimeout(2000);
46  		myContext.setSocketFactory(socketFactory);
47  		myContext.setExecutorService(Executors.newCachedThreadPool(new MyThreadFactory()));
48  		myContext.setLowerLayerProtocol(new MinLowerLayerProtocol(true));
49  		myContext.setModelClassFactory(new GenericModelClassFactory());
50  		myServer = myContext.newServer(myPort, false);
51  
52  		for (int i = 0; i < myAppRoutingData.size(); i++) {
53  			myServer.registerApplication(myAppRoutingData.get(i), myApplications.get(i));
54  		}
55  		
56  		ourLog.info("Starting listener on port {}", myPort);
57  		myServer.startAndWait();
58  		ourLog.info("Listener on port {} has started, and is ready for processing", myPort);
59  
60  		if (myServer.getServiceExitedWithException() != null) {
61  			Throwable ex = myServer.getServiceExitedWithException();
62  			ourLog.error("Server failed to start", ex);
63  			if (ex instanceof Exception) {
64  				throw (Exception) ex;
65  			} else {
66  				throw new Error(ex);
67  			}
68  		}
69  
70  	}
71  
72  	
73  	/**
74  	 * Fired automatically by the container when
75  	 * the bean is shutting down
76  	 */
77  	public void destroy() throws Exception {
78  		ourLog.info("Stopping listener on port {}", myPort);
79  		myServer.stopAndWait();
80  		ourLog.info("Listener on port {} has stopped", myPort);
81  		
82  		ourLog.info("Closing HAPI Context Object");
83  		myContext.close();
84  		ourLog.info("Done closing HAPI Context object");
85  	}
86  
87  	public String getBeanName() {
88  		return myBeanName;
89  	}
90  
91  
92  	public void registerApplication(AppRoutingData theAppRouting, ReceivingApplication<? extends Message> theReceivingApplication) {
93  		Validate.notNull(theAppRouting, "appRouting");
94  		Validate.notNull(theReceivingApplication, "receivingApplication");
95  		
96  		if (myServer != null) {
97  			myServer.registerApplication(theAppRouting, theReceivingApplication);
98  		} else {
99  			myAppRoutingData.add(theAppRouting);
100 			myApplications.add(theReceivingApplication);
101 		}
102 	}
103 
104 
105 	public void setBeanName(String theBeanName) {
106 		myBeanName = theBeanName;
107 	}
108 
109 	public void setPort(int thePort) {
110 		myPort = thePort;
111 	}
112 
113 	private class MyThreadFactory implements ThreadFactory {
114 
115 		private final ThreadGroup group;
116 
117 		private MyThreadFactory() {
118 			group = Thread.currentThread().getThreadGroup();
119 		}
120 
121 		public Thread newThread(Runnable theR) {
122 			String name = "hoh-port-" + myPort + "-worker-" + threadNum.getAndIncrement();
123 			Thread t = new Thread(group, theR, name, 0);
124 			if (t.isDaemon()) {
125 				t.setDaemon(false);
126 			}
127 			if (t.getPriority() != Thread.NORM_PRIORITY) {
128 				t.setPriority(Thread.NORM_PRIORITY);
129 			}
130 			return t;
131 		}
132 
133 	}
134 
135 
136 }