Coverage Report - ca.uhn.hl7v2.app.AcceptorThread
 
Classes in this File Line Coverage Branch Coverage Complexity
AcceptorThread
79%
38/48
70%
7/10
2.5
AcceptorThread$AcceptedSocket
83%
5/6
50%
1/2
2.5
 
 1  
 /**
 2  
 The contents of this file are subject to the Mozilla Public License Version 1.1 
 3  
 (the "License"); you may not use this file except in compliance with the License. 
 4  
 You may obtain a copy of the License at http://www.mozilla.org/MPL/ 
 5  
 Software distributed under the License is distributed on an "AS IS" basis, 
 6  
 WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the 
 7  
 specific language governing rights and limitations under the License. 
 8  
 
 9  
 The Original Code is "AcceptorThread.java".  Description: 
 10  
 "A TCP/IP-based HL7 Service that uses separate ports for inbound and outbound messages." 
 11  
 
 12  
 The Initial Developer of the Original Code is University Health Network. Copyright (C) 
 13  
 2001.  All Rights Reserved. 
 14  
 
 15  
 Contributor(s): ______________________________________. 
 16  
 
 17  
 Alternatively, the contents of this file may be used under the terms of the 
 18  
 GNU General Public License (the "GPL"), in which case the provisions of the GPL are 
 19  
 applicable instead of those above.  If you wish to allow use of your version of this 
 20  
 file only under the terms of the GPL and not to allow others to use your version 
 21  
 of this file under the MPL, indicate your decision by deleting  the provisions above 
 22  
 and replace  them with the notice and other provisions required by the GPL License.  
 23  
 If you do not delete the provisions above, a recipient may use your version of 
 24  
 this file under either the MPL or the GPL. 
 25  
  */
 26  
 package ca.uhn.hl7v2.app;
 27  
 
 28  
 import java.io.IOException;
 29  
 import java.net.InetSocketAddress;
 30  
 import java.net.ServerSocket;
 31  
 import java.net.Socket;
 32  
 import java.net.SocketException;
 33  
 import java.net.SocketTimeoutException;
 34  
 import java.util.concurrent.BlockingQueue;
 35  
 import java.util.concurrent.ExecutorService;
 36  
 
 37  
 import org.slf4j.Logger;
 38  
 import org.slf4j.LoggerFactory;
 39  
 
 40  
 import ca.uhn.hl7v2.concurrent.Service;
 41  
 import ca.uhn.hl7v2.util.SocketFactory;
 42  
 import ca.uhn.hl7v2.util.StandardSocketFactory;
 43  
 
 44  
 /**
 45  
  * A Runnable that accepts connections on a ServerSocket and adds them to a
 46  
  * {@link BlockingQueue}, so that they can be handled asynchronously. After
 47  
  * stop() is called, the ServerSocket is closed.
 48  
  */
 49  
 class AcceptorThread extends Service {
 50  
 
 51  
         /**
 52  
          * @deprecated See {@link StandardSocketFactory#DEFAULT_ACCEPTED_SOCKET_TIMEOUT}
 53  
          */
 54  
         @Deprecated
 55  
         static final int TIMEOUT = 500;
 56  
         
 57  10
         private static final Logger log = LoggerFactory
 58  5
                         .getLogger(AcceptorThread.class);
 59  
         private int port;
 60  160
         private boolean tls = false;
 61  
         private ServerSocket ss;
 62  
         private final BlockingQueue<AcceptedSocket> queue;
 63  
         private final SocketFactory socketFactory;
 64  
 
 65  
         public AcceptorThread(ServerSocket serverSocket, int port, ExecutorService service,
 66  
                         BlockingQueue<AcceptedSocket> queue) throws IOException,
 67  
                         SocketException {
 68  0
                 this(port, false, service, queue);
 69  0
                 this.ss = serverSocket;
 70  0
         }
 71  
 
 72  
         public AcceptorThread(int port, ExecutorService service,
 73  
                         BlockingQueue<AcceptedSocket> queue) throws IOException,
 74  
                         SocketException {
 75  0
                 this(port, false, service, queue);
 76  0
         }
 77  
 
 78  
         public AcceptorThread(int port, boolean tls, ExecutorService service,
 79  
                         BlockingQueue<AcceptedSocket> queue) throws IOException,
 80  
                         SocketException {
 81  10
                 this(port, tls, service, queue, null);
 82  10
         }
 83  
 
 84  
         public AcceptorThread(int port, boolean tls, ExecutorService service, BlockingQueue<AcceptedSocket> queue, SocketFactory socketFactory) {
 85  160
                 super("Socket Acceptor", service);
 86  160
                 this.port = port;
 87  160
                 this.queue = queue;
 88  160
                 this.tls = tls;
 89  160
                 if (socketFactory == null) {
 90  10
                         socketFactory = new StandardSocketFactory();
 91  
                 }
 92  160
                 this.socketFactory = socketFactory;
 93  160
         }
 94  
 
 95  
         @Override
 96  
         protected void afterStartup() {
 97  160
                 super.afterStartup();
 98  
                 try {
 99  160
                         if (this.tls) {
 100  0
                                 ss = socketFactory.createTlsServerSocket();
 101  
                         } else {
 102  160
                                 ss = socketFactory.createServerSocket();
 103  
                         }
 104  160
                         ss.bind(new InetSocketAddress(port));
 105  155
                         ss.setSoTimeout(500);
 106  5
                 } catch (IOException e) {
 107  5
                         final String message = String.format("Unable to create ServerSocket on port %d", port);
 108  5
                         throw new RuntimeException(message, e);
 109  155
                 }
 110  155
         }
 111  
 
 112  
         @Override
 113  
         protected void handle() {
 114  
                 try {
 115  3698
                         Socket s = ss.accept();
 116  305
                         socketFactory.configureNewAcceptedSocket(s);
 117  305
                         if (!queue.offer(new AcceptedSocket(s))) {
 118  5
                                 log.error("Denied enqueuing server-side socket {}", s);
 119  5
                                 s.close();
 120  
                         } else
 121  300
                                 log.debug("Enqueued server-side socket {}", s);
 122  3363
                 } catch (SocketTimeoutException e) { /* OK - just timed out */
 123  3363
                         log.trace("No connection established while waiting");
 124  0
                 } catch (IOException e) {
 125  0
                         log.error("Error while accepting connections", e);
 126  3668
                 }
 127  3668
         }
 128  
 
 129  
         @Override
 130  
         protected void afterTermination() {
 131  130
                 super.afterTermination();
 132  
                 try {
 133  130
                         if (ss != null && !ss.isClosed())
 134  130
                                 ss.close();
 135  0
                 } catch (IOException e) {
 136  0
                         log.warn("Error during stopping the thread", e);
 137  130
                 }
 138  130
         }
 139  
 
 140  
         class AcceptedSocket {
 141  
                 Socket socket;
 142  
                 AcceptorThread origin;
 143  
 
 144  305
                 public AcceptedSocket(Socket socket) {
 145  305
                         if (socket == null)
 146  0
                                 throw new IllegalArgumentException("Socket must not be null");
 147  305
                         this.socket = socket;
 148  305
                         this.origin = AcceptorThread.this;
 149  305
                 }
 150  
 
 151  
         }
 152  
 
 153  
 }