1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package ca.uhn.hl7v2.app;
27
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketTimeoutException;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.ExecutorService;
35
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import ca.uhn.hl7v2.concurrent.Service;
40 import ca.uhn.hl7v2.util.SocketFactory;
41 import ca.uhn.hl7v2.util.StandardSocketFactory;
42
43
44
45
46
47
48 class AcceptorThread extends Service {
49
50
51
52
53 @Deprecated
54 static final int TIMEOUT = 500;
55
56 private static final Logger log = LoggerFactory
57 .getLogger(AcceptorThread.class);
58 private final int port;
59 private final boolean tls;
60 private ServerSocket ss;
61 private final BlockingQueue<AcceptedSocket> queue;
62 private final SocketFactory socketFactory;
63
64 public AcceptorThread(ServerSocket serverSocket, int port, ExecutorService service,
65 BlockingQueue<AcceptedSocket> queue) throws IOException {
66 this(port, false, service, queue);
67 this.ss = serverSocket;
68 }
69
70 public AcceptorThread(int port, ExecutorService service,
71 BlockingQueue<AcceptedSocket> queue) throws IOException {
72 this(port, false, service, queue);
73 }
74
75 public AcceptorThread(int port, boolean tls, ExecutorService service,
76 BlockingQueue<AcceptedSocket> queue) {
77 this(port, tls, service, queue, null);
78 }
79
80 public AcceptorThread(int port, boolean tls, ExecutorService service, BlockingQueue<AcceptedSocket> queue, SocketFactory socketFactory) {
81 super("Socket Acceptor", service);
82 this.port = port;
83 this.queue = queue;
84 this.tls = tls;
85 if (socketFactory == null) {
86 socketFactory = new StandardSocketFactory();
87 }
88 this.socketFactory = socketFactory;
89 }
90
91 @Override
92 protected void afterStartup() {
93 super.afterStartup();
94 try {
95 if (this.tls) {
96 ss = socketFactory.createTlsServerSocket();
97 } else {
98 ss = socketFactory.createServerSocket();
99 }
100 ss.bind(new InetSocketAddress(port));
101 ss.setSoTimeout(500);
102 } catch (IOException e) {
103 final String message = String.format("Unable to create ServerSocket on port %d", port);
104 throw new RuntimeException(message, e);
105 }
106 }
107
108 @Override
109 protected void handle() {
110 try {
111 Socket s = ss.accept();
112 socketFactory.configureNewAcceptedSocket(s);
113 if (!queue.offer(new AcceptedSocket(s))) {
114 log.error("Denied enqueuing server-side socket {}", s);
115 s.close();
116 } else
117 log.debug("Enqueued server-side socket {}", s);
118 } catch (SocketTimeoutException e) {
119 log.trace("No connection established while waiting");
120 } catch (IOException e) {
121 log.error("Error while accepting connections", e);
122 }
123 }
124
125 @Override
126 protected void afterTermination() {
127 super.afterTermination();
128 try {
129 if (ss != null && !ss.isClosed())
130 ss.close();
131 } catch (IOException e) {
132 log.warn("Error during stopping the thread", e);
133 }
134 }
135
136 class AcceptedSocket {
137 Socket socket;
138 AcceptorThread origin;
139
140 public AcceptedSocket(Socket socket) {
141 if (socket == null)
142 throw new IllegalArgumentException("Socket must not be null");
143 this.socket = socket;
144 this.origin = AcceptorThread.this;
145 }
146
147 }
148
149 }