1
2
3
4
5 package org.sourceforge.jemmrpc.server;
6
7 import java.io.IOException;
8 import java.net.ServerSocket;
9 import java.net.Socket;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.Map;
13 import java.util.Set;
14 import java.util.concurrent.CountDownLatch;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17
18 import org.apache.log4j.Logger;
19 import org.sourceforge.jemmrpc.example.echo.EchoClient;
20 import org.sourceforge.jemmrpc.example.echo.EchoServer;
21 import org.sourceforge.jemmrpc.shared.IFUtilities;
22 import org.sourceforge.jemmrpc.shared.Message;
23 import org.sourceforge.jemmrpc.shared.RPCHandler;
24
25
26
27
28
29
30
31
32
33
34 public class RPCServer extends Thread
35 {
36 protected static final Logger logger = Logger.getLogger(RPCServer.class);
37
38
39 protected Map<ClientId, ServerThread> clientThreads = new HashMap<ClientId, ServerThread>();
40
41
42 protected CountDownLatch initialisationLatch = new CountDownLatch(1);
43
44
45 protected boolean closing = false;
46
47
48 protected ExecutorService msgProccessingThreadPool;
49
50
51 protected int port;
52
53
54 protected ServerSocket serverSocket;
55
56
57
58
59
60 HashMap<Class<?>, Object> serverIFMap = new HashMap<Class<?>, Object>();
61
62 ClientConnectionListener clientConnListener;
63
64
65
66
67
68
69
70 public ClientId getClientId()
71 {
72 return (ClientId) RPCHandler.getConnectionId();
73 }
74
75
76
77
78
79
80
81
82 public Object getClientIF(ClientId clientId, Class<?> ifClass)
83 {
84 final ServerThread st = clientThreads.get(clientId);
85 return st.getClientIF(ifClass);
86 }
87
88
89
90
91
92
93
94 public RPCServer(int port, int noProcessingThreads)
95 {
96 this.port = port;
97 msgProccessingThreadPool = Executors.newFixedThreadPool(noProcessingThreads);
98 }
99
100
101
102
103
104
105
106 public RPCServer(int port, ExecutorService executorService)
107 {
108 this.port = port;
109 msgProccessingThreadPool = executorService;
110 }
111
112
113
114
115
116
117
118 public void clientDisconnected(final ClientId clientId)
119 {
120 logger.info("client " + clientId + " disconnected");
121
122 final ClientConnectionListener listener = this.clientConnListener;
123 if (listener != null)
124 listener.clientDisconnected(clientId);
125
126 synchronized (clientThreads)
127 {
128 clientThreads.remove(clientId);
129 }
130 }
131
132
133
134
135
136
137
138 public void processMessage(final ClientId clientId, final Message message)
139 {
140 }
141
142
143
144
145
146
147
148 public void waitForInitialisation()
149 {
150 try
151 {
152 initialisationLatch.await();
153 } catch (final InterruptedException e)
154 {
155 e.printStackTrace();
156 }
157 }
158
159 @Override
160 public void run()
161 {
162 try
163 {
164 logger.info("Server started");
165 serverSocket = new ServerSocket(port);
166 logger.info("Listening on port: " + port);
167 initialisationLatch.countDown();
168 while (true)
169 {
170 final Socket s = serverSocket.accept();
171 final ClientId clientId = new ClientId();
172 logger.info("connection from " + s.getInetAddress() + ":" + s.getPort()
173 + " assigned id=" + clientId);
174 final ServerThread st = new ServerThread(this, s, clientId, serverIFMap,
175 msgProccessingThreadPool);
176 synchronized (clientThreads)
177 {
178 clientThreads.put(clientId, st);
179 }
180
181
182 st.start();
183 }
184 } catch (final IOException e)
185 {
186 if (!closing)
187 logger.error("Unexpected exception whilst accepting socket connections",
188 e);
189 }
190
191 logger.info("Server terminated");
192 }
193
194
195
196
197 public void shutdown()
198 {
199 closing = true;
200 try
201 {
202 logger.info("Shutting down");
203
204
205 serverSocket.close();
206 msgProccessingThreadPool.shutdown();
207 final Set<ServerThread> toShutdown = new HashSet<ServerThread>();
208 toShutdown.addAll(clientThreads.values());
209
210
211 logger.info("Notifying client threads");
212 for (final ServerThread clientThread : toShutdown)
213 clientThread.shutdown();
214
215 for (final ServerThread clientThread : toShutdown)
216 clientThread.waitForShutdown();
217
218 logger.info("Client threads complete");
219 } catch (final IOException ioe)
220 {
221 logger.warn("Exception thrown whilst terminating server", ioe);
222 }
223 logger.info("Shutdown complete");
224
225 }
226
227
228
229
230
231
232
233 public void registerInterface(Class<?> targetIF, Object targetIFImpl)
234 {
235 IFUtilities.validateInterface(targetIF);
236
237 if (!targetIF.isInstance(targetIFImpl))
238 throw new IllegalArgumentException("Target impl not instance of interface");
239
240 serverIFMap.put(targetIF, targetIFImpl);
241 }
242
243
244
245
246
247
248 public void setClientListener(ClientConnectionListener listener)
249 {
250 this.clientConnListener = listener;
251 }
252
253
254
255
256
257
258 public void notifyNewClient(ClientId clientId, String clientHostname, int clientPort)
259 {
260 final ClientConnectionListener listener = this.clientConnListener;
261 if (listener != null)
262 listener.clientConnected(clientId, clientHostname, clientPort);
263 else
264 logger.debug("No listener set for new client notifications");
265 }
266 }