001package ca.uhn.hl7v2.hoh.relay.listener; 002 003import java.util.ArrayList; 004import java.util.List; 005import java.util.concurrent.Executors; 006import java.util.concurrent.ThreadFactory; 007import java.util.concurrent.atomic.AtomicInteger; 008 009import ca.uhn.hl7v2.model.Message; 010import org.springframework.beans.factory.BeanNameAware; 011import org.springframework.beans.factory.DisposableBean; 012import org.springframework.beans.factory.InitializingBean; 013 014import ca.uhn.hl7v2.DefaultHapiContext; 015import ca.uhn.hl7v2.app.SimpleServer; 016import ca.uhn.hl7v2.hoh.util.Validate; 017import ca.uhn.hl7v2.llp.MinLowerLayerProtocol; 018import ca.uhn.hl7v2.parser.GenericModelClassFactory; 019import ca.uhn.hl7v2.protocol.ApplicationRouter.AppRoutingData; 020import ca.uhn.hl7v2.protocol.ReceivingApplication; 021import ca.uhn.hl7v2.util.StandardSocketFactory; 022 023public class RelayMllpListener implements InitializingBean, DisposableBean, IRelayListener, BeanNameAware { 024 025 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(RelayMllpListener.class); 026 private List<ReceivingApplication<? extends Message>> myApplications = new ArrayList<ReceivingApplication<? extends Message>>(); 027 private List<AppRoutingData> myAppRoutingData = new ArrayList<AppRoutingData>(); 028 private String myBeanName; 029 private int myPort; 030 private SimpleServer myServer; 031 private AtomicInteger threadNum = new AtomicInteger(1); 032 private DefaultHapiContext myContext; 033 034 /** 035 * Fired automatically by the container when 036 * the bean is ready to start 037 */ 038 public void afterPropertiesSet() throws Exception { 039 if (myPort <= 0) { 040 throw new IllegalStateException("Port not set"); 041 } 042 043 myContext = new DefaultHapiContext(); 044 StandardSocketFactory socketFactory = new StandardSocketFactory(); 045 socketFactory.setAcceptedSocketTimeout(2000); 046 myContext.setSocketFactory(socketFactory); 047 myContext.setExecutorService(Executors.newCachedThreadPool(new MyThreadFactory())); 048 myContext.setLowerLayerProtocol(new MinLowerLayerProtocol(true)); 049 myContext.setModelClassFactory(new GenericModelClassFactory()); 050 myServer = myContext.newServer(myPort, false); 051 052 for (int i = 0; i < myAppRoutingData.size(); i++) { 053 myServer.registerApplication(myAppRoutingData.get(i), myApplications.get(i)); 054 } 055 056 ourLog.info("Starting listener on port {}", myPort); 057 myServer.startAndWait(); 058 ourLog.info("Listener on port {} has started, and is ready for processing", myPort); 059 060 if (myServer.getServiceExitedWithException() != null) { 061 Throwable ex = myServer.getServiceExitedWithException(); 062 ourLog.error("Server failed to start", ex); 063 if (ex instanceof Exception) { 064 throw (Exception) ex; 065 } else { 066 throw new Error(ex); 067 } 068 } 069 070 } 071 072 073 /** 074 * Fired automatically by the container when 075 * the bean is shutting down 076 */ 077 public void destroy() throws Exception { 078 ourLog.info("Stopping listener on port {}", myPort); 079 myServer.stopAndWait(); 080 ourLog.info("Listener on port {} has stopped", myPort); 081 082 ourLog.info("Closing HAPI Context Object"); 083 myContext.close(); 084 ourLog.info("Done closing HAPI Context object"); 085 } 086 087 public String getBeanName() { 088 return myBeanName; 089 } 090 091 092 public void registerApplication(AppRoutingData theAppRouting, ReceivingApplication<? extends Message> theReceivingApplication) { 093 Validate.notNull(theAppRouting, "appRouting"); 094 Validate.notNull(theReceivingApplication, "receivingApplication"); 095 096 if (myServer != null) { 097 myServer.registerApplication(theAppRouting, theReceivingApplication); 098 } else { 099 myAppRoutingData.add(theAppRouting); 100 myApplications.add(theReceivingApplication); 101 } 102 } 103 104 105 public void setBeanName(String theBeanName) { 106 myBeanName = theBeanName; 107 } 108 109 public void setPort(int thePort) { 110 myPort = thePort; 111 } 112 113 private class MyThreadFactory implements ThreadFactory { 114 115 private ThreadGroup group; 116 117 private MyThreadFactory() { 118 group = Thread.currentThread().getThreadGroup(); 119 } 120 121 public Thread newThread(Runnable theR) { 122 String name = "hoh-port-" + myPort + "-worker-" + threadNum.getAndIncrement(); 123 Thread t = new Thread(group, theR, name, 0); 124 if (t.isDaemon()) { 125 t.setDaemon(false); 126 } 127 if (t.getPriority() != Thread.NORM_PRIORITY) { 128 t.setPriority(Thread.NORM_PRIORITY); 129 } 130 return t; 131 } 132 133 } 134 135 136}