src/core/net/sf/basedb/util/jobagent/JobAgentServerConnection.java

Code
Comments
Other
Rev Date Author Line
2632 08 Sep 06 nicklas 1 /**
2632 08 Sep 06 nicklas 2   $Id$
2632 08 Sep 06 nicklas 3
3675 16 Aug 07 jari 4   Copyright (C) 2006 Nicklas Nordborg
2632 08 Sep 06 nicklas 5
2632 08 Sep 06 nicklas 6   This file is part of BASE - BioArray Software Environment.
2632 08 Sep 06 nicklas 7   Available at http://base.thep.lu.se/
2632 08 Sep 06 nicklas 8
2632 08 Sep 06 nicklas 9   BASE is free software; you can redistribute it and/or
2632 08 Sep 06 nicklas 10   modify it under the terms of the GNU General Public License
4479 05 Sep 08 jari 11   as published by the Free Software Foundation; either version 3
2632 08 Sep 06 nicklas 12   of the License, or (at your option) any later version.
2632 08 Sep 06 nicklas 13
2632 08 Sep 06 nicklas 14   BASE is distributed in the hope that it will be useful,
2632 08 Sep 06 nicklas 15   but WITHOUT ANY WARRANTY; without even the implied warranty of
2632 08 Sep 06 nicklas 16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
2632 08 Sep 06 nicklas 17   GNU General Public License for more details.
2632 08 Sep 06 nicklas 18
2632 08 Sep 06 nicklas 19   You should have received a copy of the GNU General Public License
4515 11 Sep 08 jari 20   along with BASE. If not, see <http://www.gnu.org/licenses/>.
2632 08 Sep 06 nicklas 21 */
2632 08 Sep 06 nicklas 22 package net.sf.basedb.util.jobagent;
2632 08 Sep 06 nicklas 23
2632 08 Sep 06 nicklas 24 import java.io.IOException;
2634 12 Sep 06 nicklas 25 import java.net.InetSocketAddress;
2632 08 Sep 06 nicklas 26 import java.net.Socket;
2634 12 Sep 06 nicklas 27 import java.nio.channels.ClosedByInterruptException;
2634 12 Sep 06 nicklas 28 import java.nio.channels.ServerSocketChannel;
2634 12 Sep 06 nicklas 29 import java.nio.channels.SocketChannel;
2632 08 Sep 06 nicklas 30
6444 09 Apr 14 nicklas 31 import org.slf4j.Logger;
2641 14 Sep 06 nicklas 32
2634 12 Sep 06 nicklas 33 import net.sf.basedb.util.SocketUtil;
2634 12 Sep 06 nicklas 34
2632 08 Sep 06 nicklas 35 /**
2632 08 Sep 06 nicklas 36   This class is used by job agents to listen for incoming
2632 08 Sep 06 nicklas 37   requests from client applications. 
2632 08 Sep 06 nicklas 38   
2632 08 Sep 06 nicklas 39   @author nicklas
2632 08 Sep 06 nicklas 40   @version 2.0
2632 08 Sep 06 nicklas 41   @base.modified $Date$
2632 08 Sep 06 nicklas 42 */
2632 08 Sep 06 nicklas 43 public class JobAgentServerConnection
7551 12 Dec 18 nicklas 44   implements AutoCloseable
2632 08 Sep 06 nicklas 45 {
2632 08 Sep 06 nicklas 46   private final int port;
2632 08 Sep 06 nicklas 47   private final RequestHandler requestHandler;
2641 14 Sep 06 nicklas 48   private final Logger logger;
2632 08 Sep 06 nicklas 49   private Thread listener;
2632 08 Sep 06 nicklas 50   
2634 12 Sep 06 nicklas 51   /**
2634 12 Sep 06 nicklas 52     Create a new server connection. The {@link #open()} method must be called
2634 12 Sep 06 nicklas 53     to start listening for incoming connections.
2634 12 Sep 06 nicklas 54     
2634 12 Sep 06 nicklas 55     @param port The port to listing to for incoming connections
2634 12 Sep 06 nicklas 56     @param requestHandler A handler for taking care of incoming requests.
2634 12 Sep 06 nicklas 57       The handler must be thread-safe and able to handle multiple 
2634 12 Sep 06 nicklas 58       requests at the same time
2641 14 Sep 06 nicklas 59     @param logger A logger object for logging debug and other information
2641 14 Sep 06 nicklas 60       or null if no logging is wanted
2634 12 Sep 06 nicklas 61   */
2641 14 Sep 06 nicklas 62   public JobAgentServerConnection(int port, RequestHandler requestHandler, Logger logger)
2632 08 Sep 06 nicklas 63   {
2632 08 Sep 06 nicklas 64     this.port = port;
2632 08 Sep 06 nicklas 65     this.requestHandler = requestHandler;
2641 14 Sep 06 nicklas 66     this.logger = logger;
2632 08 Sep 06 nicklas 67   }
2632 08 Sep 06 nicklas 68   
2634 12 Sep 06 nicklas 69   /**
2634 12 Sep 06 nicklas 70     Start listening for incoming connections. This method starts a new listener 
2634 12 Sep 06 nicklas 71     thread that answers incoming requests. For each request a new thread
2981 30 Nov 06 nicklas 72     is created and control is passed on to the {@link RequestHandler}. If a listener
2634 12 Sep 06 nicklas 73     has already been create thi method does nothing.
2634 12 Sep 06 nicklas 74     @throws IOException If there is an error
2634 12 Sep 06 nicklas 75   */
2632 08 Sep 06 nicklas 76   public void open()
2632 08 Sep 06 nicklas 77     throws IOException
2632 08 Sep 06 nicklas 78   {
2634 12 Sep 06 nicklas 79     if (listener != null) return;
2641 14 Sep 06 nicklas 80     if (logger != null) logger.info("Opening listener on port " + port);
2634 12 Sep 06 nicklas 81     
2634 12 Sep 06 nicklas 82     ServerSocketChannel channel = ServerSocketChannel.open();
2634 12 Sep 06 nicklas 83     channel.socket().bind(new InetSocketAddress(port));
2634 12 Sep 06 nicklas 84     
2641 14 Sep 06 nicklas 85     listener = new Thread(new ListenerThread(channel, requestHandler, logger), 
2641 14 Sep 06 nicklas 86         "ListenerThread."+this.toString());
2632 08 Sep 06 nicklas 87     listener.start();
2641 14 Sep 06 nicklas 88     if (logger != null) logger.info("Now listening on port " + port);
2632 08 Sep 06 nicklas 89   }
2632 08 Sep 06 nicklas 90   
2634 12 Sep 06 nicklas 91   /**
2634 12 Sep 06 nicklas 92     Stop listening for incoming connections. The thread that listens for incoming 
2634 12 Sep 06 nicklas 93     connections is killed. Any executing requests are not affected by this method.
2634 12 Sep 06 nicklas 94   */
7551 12 Dec 18 nicklas 95   @Override
2632 08 Sep 06 nicklas 96   public void close()
2632 08 Sep 06 nicklas 97   {
2634 12 Sep 06 nicklas 98     if (listener == null) return;
2641 14 Sep 06 nicklas 99     if (logger != null) logger.info("Stopping listener on port " + port);
2632 08 Sep 06 nicklas 100     listener.interrupt();
2632 08 Sep 06 nicklas 101     listener = null;
2632 08 Sep 06 nicklas 102   }
2632 08 Sep 06 nicklas 103
2634 12 Sep 06 nicklas 104   /**
2634 12 Sep 06 nicklas 105      Is a listener active or not for this server.
2634 12 Sep 06 nicklas 106     @return TRUE if a listener is active, FALSE otherwise
2634 12 Sep 06 nicklas 107   */
2634 12 Sep 06 nicklas 108   public boolean isListening()
2632 08 Sep 06 nicklas 109   {
2634 12 Sep 06 nicklas 110     return listener != null;
2632 08 Sep 06 nicklas 111   }
2632 08 Sep 06 nicklas 112   
2634 12 Sep 06 nicklas 113   /**
2634 12 Sep 06 nicklas 114     This class is used for listening to the specified socket for
2634 12 Sep 06 nicklas 115     incoming connections. Each request is forwarded to the 
2634 12 Sep 06 nicklas 116     <code>RequestHandler</code> in a separate thread.
2634 12 Sep 06 nicklas 117   */
2634 12 Sep 06 nicklas 118   private static class ListenerThread
2632 08 Sep 06 nicklas 119     implements Runnable
2632 08 Sep 06 nicklas 120   {
2634 12 Sep 06 nicklas 121     
2634 12 Sep 06 nicklas 122     private final ServerSocketChannel socket;
2634 12 Sep 06 nicklas 123     private final RequestHandler requestHandler;
2641 14 Sep 06 nicklas 124     private final Logger logger;
2634 12 Sep 06 nicklas 125     
2641 14 Sep 06 nicklas 126     private ListenerThread(ServerSocketChannel socket, RequestHandler requestHandler, Logger logger)
2634 12 Sep 06 nicklas 127     {
2634 12 Sep 06 nicklas 128       this.socket = socket;
2634 12 Sep 06 nicklas 129       this.requestHandler = requestHandler;
2641 14 Sep 06 nicklas 130       this.logger = logger;
2634 12 Sep 06 nicklas 131     }
2634 12 Sep 06 nicklas 132     
2634 12 Sep 06 nicklas 133     /*
2634 12 Sep 06 nicklas 134       From the Runnable interface
2634 12 Sep 06 nicklas 135       -------------------------------------------
2634 12 Sep 06 nicklas 136     */
6127 14 Sep 12 nicklas 137     @Override
2632 08 Sep 06 nicklas 138     public void run()
2632 08 Sep 06 nicklas 139     {
2634 12 Sep 06 nicklas 140       // We listen until someone tells us not to!
2634 12 Sep 06 nicklas 141       boolean interrupted = false;
2634 12 Sep 06 nicklas 142       while (!interrupted)
2632 08 Sep 06 nicklas 143       {
2632 08 Sep 06 nicklas 144         try
2632 08 Sep 06 nicklas 145         {
2634 12 Sep 06 nicklas 146           SocketChannel incoming = socket.accept();
2641 14 Sep 06 nicklas 147           if (logger != null && logger.isDebugEnabled()) 
2641 14 Sep 06 nicklas 148           {
2641 14 Sep 06 nicklas 149             logger.debug("Accepted incoming connection from: " + 
2641 14 Sep 06 nicklas 150               incoming.socket().getInetAddress().toString());
2641 14 Sep 06 nicklas 151           }
2641 14 Sep 06 nicklas 152           Thread request = new Thread(new RequestHandlerThread(incoming.socket(), 
2641 14 Sep 06 nicklas 153               requestHandler, logger), "RequestHandlerThread." + this.toString());
2632 08 Sep 06 nicklas 154           request.start();
2634 12 Sep 06 nicklas 155           interrupted = Thread.interrupted();
2632 08 Sep 06 nicklas 156         }
2634 12 Sep 06 nicklas 157         catch (ClosedByInterruptException ex)
2634 12 Sep 06 nicklas 158         {
2641 14 Sep 06 nicklas 159           if (logger != null) logger.info("Listener service was interrupted by another thread");
2634 12 Sep 06 nicklas 160           interrupted = true;
2634 12 Sep 06 nicklas 161         }
2632 08 Sep 06 nicklas 162         catch (Throwable t)
2632 08 Sep 06 nicklas 163         {
2641 14 Sep 06 nicklas 164           if (logger != null) logger.error(t.getMessage(), t);
2641 14 Sep 06 nicklas 165           interrupted = true;
2632 08 Sep 06 nicklas 166         }
2632 08 Sep 06 nicklas 167       }
2641 14 Sep 06 nicklas 168       if (logger != null) logger.info("Closing socket on port " + socket.socket().getLocalPort());
2634 12 Sep 06 nicklas 169       SocketUtil.close(socket);
2632 08 Sep 06 nicklas 170     }
2634 12 Sep 06 nicklas 171     // -------------------------------------------
2632 08 Sep 06 nicklas 172   }
2632 08 Sep 06 nicklas 173   
2634 12 Sep 06 nicklas 174   /**
2634 12 Sep 06 nicklas 175     This class is used for forwarding requests to a {@link RequestHandler}.
2634 12 Sep 06 nicklas 176     For each accepted request a new thread is created befor the request
2634 12 Sep 06 nicklas 177     is forwarded to the <code>RequestHandler</code>.
2634 12 Sep 06 nicklas 178   */
2634 12 Sep 06 nicklas 179   private static class RequestHandlerThread
2632 08 Sep 06 nicklas 180     implements Runnable
2632 08 Sep 06 nicklas 181   {
2632 08 Sep 06 nicklas 182     private final Socket incoming;
2634 12 Sep 06 nicklas 183     private final RequestHandler requestHandler;
2641 14 Sep 06 nicklas 184     private final Logger logger;
2634 12 Sep 06 nicklas 185     
2641 14 Sep 06 nicklas 186     private RequestHandlerThread(Socket incoming, RequestHandler requestHandler, Logger logger)
2632 08 Sep 06 nicklas 187     {
2632 08 Sep 06 nicklas 188       this.incoming = incoming;
2634 12 Sep 06 nicklas 189       this.requestHandler = requestHandler;
2641 14 Sep 06 nicklas 190       this.logger = logger;
2632 08 Sep 06 nicklas 191     }
2632 08 Sep 06 nicklas 192     
2634 12 Sep 06 nicklas 193     /*
2634 12 Sep 06 nicklas 194       From the Runnable interface
2634 12 Sep 06 nicklas 195       -------------------------------------------
2634 12 Sep 06 nicklas 196     */
6127 14 Sep 12 nicklas 197     @Override
2632 08 Sep 06 nicklas 198     public void run()
2632 08 Sep 06 nicklas 199     {
2632 08 Sep 06 nicklas 200       try
2632 08 Sep 06 nicklas 201       {
2634 12 Sep 06 nicklas 202         String cmd = SocketUtil.read(incoming, true);
2634 12 Sep 06 nicklas 203         String answer = null;
2634 12 Sep 06 nicklas 204         try
2634 12 Sep 06 nicklas 205         {
2634 12 Sep 06 nicklas 206           answer = requestHandler.handleCmd(incoming, cmd);
2634 12 Sep 06 nicklas 207         }
2634 12 Sep 06 nicklas 208         catch (Throwable t)
2634 12 Sep 06 nicklas 209         {
3992 22 Nov 07 nicklas 210           answer = t.getMessage() + " [" + t.getClass().getName() + "]";
2641 14 Sep 06 nicklas 211           if (logger != null) logger.error("Eception in request handler "+ requestHandler + 
2641 14 Sep 06 nicklas 212             ": " + t.getMessage(), t);
2634 12 Sep 06 nicklas 213         }
2634 12 Sep 06 nicklas 214         SocketUtil.send(incoming, answer, true);
2632 08 Sep 06 nicklas 215         incoming.close();
2632 08 Sep 06 nicklas 216       }
2632 08 Sep 06 nicklas 217       catch (Throwable t)
2632 08 Sep 06 nicklas 218       {
2641 14 Sep 06 nicklas 219         if (logger != null) logger.error(t.getMessage(), t);
2632 08 Sep 06 nicklas 220       }
2632 08 Sep 06 nicklas 221     }
2634 12 Sep 06 nicklas 222     // -------------------------------------------
2632 08 Sep 06 nicklas 223     
2632 08 Sep 06 nicklas 224   }
2632 08 Sep 06 nicklas 225
2632 08 Sep 06 nicklas 226   
2632 08 Sep 06 nicklas 227   
2632 08 Sep 06 nicklas 228 }