View Javadoc

1   /*
2    * Created on 17 Nov 2007
3    * 
4    */
5   package org.sourceforge.jemmrpc.server;
6   
7   import java.io.IOException;
8   import java.net.ServerSocket;
9   import java.net.Socket;
10  import java.util.HashMap;
11  import java.util.HashSet;
12  import java.util.Map;
13  import java.util.Set;
14  import java.util.concurrent.CountDownLatch;
15  import java.util.concurrent.ExecutorService;
16  import java.util.concurrent.Executors;
17  
18  import org.apache.log4j.Logger;
19  import org.sourceforge.jemmrpc.example.echo.EchoClient;
20  import org.sourceforge.jemmrpc.example.echo.EchoServer;
21  import org.sourceforge.jemmrpc.shared.IFUtilities;
22  import org.sourceforge.jemmrpc.shared.Message;
23  import org.sourceforge.jemmrpc.shared.RPCHandler;
24  
25  /**
26   * The RPCServer offers RPC interfaces to connecting clients. The server can handle multiple client
27   * connections.
28   * 
29   * See {@link EchoClient}/{@link EchoServer} for an example of usage.
30   * 
31   * @author Rory Graves
32   * 
33   */
34  public class RPCServer extends Thread
35  {
36      protected static final Logger logger = Logger.getLogger(RPCServer.class);
37  
38      /** The map of clientIds to active clients */
39      protected Map<ClientId, ServerThread> clientThreads = new HashMap<ClientId, ServerThread>();
40  
41      /** A latch to ensure server is ready when initialisation call returns. */
42      protected CountDownLatch initialisationLatch = new CountDownLatch(1);
43  
44      /** Used for graceful shutdown. */
45      protected boolean closing = false;
46  
47      /** The thread pool for processing client call requests (shared among all clients) */
48      protected ExecutorService msgProccessingThreadPool;
49  
50      /** The port the server listens on. */
51      protected int port;
52  
53      /** The server socket which listens for client connections. */
54      protected ServerSocket serverSocket;
55  
56      /**
57       * A map of the interfaces implemented by the server and their corresponding interface
58       * implementations.
59       */
60      HashMap<Class<?>, Object> serverIFMap = new HashMap<Class<?>, Object>();
61  
62      ClientConnectionListener clientConnListener;
63  
64      /**
65       * Used by called methods to find out ID of calling client. This uses a ThreadLocal which is set
66       * before the server method is invoked.
67       * 
68       * @return The id of the caller.
69       */
70      public ClientId getClientId()
71      {
72          return (ClientId) RPCHandler.getConnectionId();
73      }
74  
75      /**
76       * Retrieve a proxy for the given interface on the given client.
77       * 
78       * @param clientId The id of the client.
79       * @param ifClass The required interface class.
80       * @return An object proxying the interface to the given client.
81       */
82      public Object getClientIF(ClientId clientId, Class<?> ifClass)
83      {
84          final ServerThread st = clientThreads.get(clientId);
85          return st.getClientIF(ifClass);
86      }
87  
88      /**
89       * Creates an RPCServer instance
90       * 
91       * @param port The socket to listen for connections on
92       * @param noProcessingThreads The number of processing threads.
93       */
94      public RPCServer(int port, int noProcessingThreads)
95      {
96          this.port = port;
97          msgProccessingThreadPool = Executors.newFixedThreadPool(noProcessingThreads);
98      }
99  
100     /**
101      * Creates an RPCServer instance
102      * 
103      * @param port The socket to listen for connections on
104      * @param executorService The executor used to service requests.
105      */
106     public RPCServer(int port, ExecutorService executorService)
107     {
108         this.port = port;
109         msgProccessingThreadPool = executorService;
110     }
111 
112     /**
113      * Notification that a given client has disconnected. The client is removed from the active
114      * pool.
115      * 
116      * @param clientId The assigned id of the disconnected client.
117      */
118     public void clientDisconnected(final ClientId clientId)
119     {
120         logger.info("client " + clientId + " disconnected");
121 
122         final ClientConnectionListener listener = this.clientConnListener;
123         if (listener != null)
124             listener.clientDisconnected(clientId);
125 
126         synchronized (clientThreads)
127         {
128             clientThreads.remove(clientId);
129         }
130     }
131 
132     /**
133      * Process an incoming message.
134      * 
135      * @param clientId The ID of the sending client.
136      * @param message The received message.
137      */
138     public void processMessage(final ClientId clientId, final Message message)
139     {
140     }
141 
142     /**
143      * This method will pause the caller until the server is ready to start accepting connections.
144      * If the server is already running it will return immediately. Mostly used for testing when a
145      * client and server are run in the same VM to avoid the client connecting before the server is
146      * ready.
147      */
148     public void waitForInitialisation()
149     {
150         try
151         {
152             initialisationLatch.await();
153         } catch (final InterruptedException e)
154         {
155             e.printStackTrace();
156         }
157     }
158 
159     @Override
160     public void run()
161     {
162         try
163         {
164             logger.info("Server started");
165             serverSocket = new ServerSocket(port);
166             logger.info("Listening on port: " + port);
167             initialisationLatch.countDown();
168             while (true)
169             {
170                 final Socket s = serverSocket.accept();
171                 final ClientId clientId = new ClientId();
172                 logger.info("connection from " + s.getInetAddress() + ":" + s.getPort()
173                         + " assigned id=" + clientId);
174                 final ServerThread st = new ServerThread(this, s, clientId, serverIFMap,
175                         msgProccessingThreadPool);
176                 synchronized (clientThreads)
177                 {
178                     clientThreads.put(clientId, st);
179                 }
180                 // starts reading messages after client thread has been added to list to avoid race
181                 // condition.
182                 st.start();
183             }
184         } catch (final IOException e)
185         {
186             if (!closing)
187                 logger.error("Unexpected exception whilst accepting socket connections",
188                         e);
189         }
190 
191         logger.info("Server terminated");
192     }
193 
194     /**
195      * Request by the system for the server to be shutdown.
196      */
197     public void shutdown()
198     {
199         closing = true;
200         try
201         {
202             logger.info("Shutting down");
203 
204             // close the server socket to stop accepting new connections
205             serverSocket.close();
206             msgProccessingThreadPool.shutdown();
207             final Set<ServerThread> toShutdown = new HashSet<ServerThread>();
208             toShutdown.addAll(clientThreads.values());
209 
210             // notify any remaining clients to shutdown
211             logger.info("Notifying client threads");
212             for (final ServerThread clientThread : toShutdown)
213                 clientThread.shutdown();
214 
215             for (final ServerThread clientThread : toShutdown)
216                 clientThread.waitForShutdown();
217 
218             logger.info("Client threads complete");
219         } catch (final IOException ioe)
220         {
221             logger.warn("Exception thrown whilst terminating server", ioe);
222         }
223         logger.info("Shutdown complete");
224 
225     }
226 
227     /**
228      * Register an interface for RPC calls by clients.
229      * 
230      * @param targetIF The interface to offer to clients.
231      * @param targetIFImpl The server side implementation of the given interface.
232      */
233     public void registerInterface(Class<?> targetIF, Object targetIFImpl)
234     {
235         IFUtilities.validateInterface(targetIF);
236 
237         if (!targetIF.isInstance(targetIFImpl))
238             throw new IllegalArgumentException("Target impl not instance of interface");
239 
240         serverIFMap.put(targetIF, targetIFImpl);
241     }
242 
243     /**
244      * Sets the client listener to the given listener.
245      * 
246      * @param listener The listener to receive disconnection events.
247      */
248     public void setClientListener(ClientConnectionListener listener)
249     {
250         this.clientConnListener = listener;
251     }
252 
253     /**
254      * @param clientId The id of the new client.
255      * @param clientHostname The clients hostname.
256      * @param clientPort The clients port.
257      */
258     public void notifyNewClient(ClientId clientId, String clientHostname, int clientPort)
259     {
260         final ClientConnectionListener listener = this.clientConnListener;
261         if (listener != null)
262             listener.clientConnected(clientId, clientHostname, clientPort);
263         else
264             logger.debug("No listener set for new client notifications");
265     }
266 }