1 package org.apache.commons.jcs3.utils.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.net.Inet6Address;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.NetworkInterface;
27 import java.net.StandardProtocolFamily;
28 import java.net.StandardSocketOptions;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.DatagramChannel;
31 import java.nio.channels.MembershipKey;
32 import java.nio.channels.SelectionKey;
33 import java.nio.channels.Selector;
34 import java.util.Iterator;
35 import java.util.concurrent.ArrayBlockingQueue;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import org.apache.commons.jcs3.engine.CacheInfo;
41 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
42 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
43 import org.apache.commons.jcs3.log.Log;
44 import org.apache.commons.jcs3.log.LogManager;
45 import org.apache.commons.jcs3.utils.net.HostNameUtil;
46 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
47 import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
48 import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
49
50
51 public class UDPDiscoveryReceiver
52 implements Runnable, IShutdownObserver
53 {
54
55 private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
56
57
58 private DatagramChannel multicastChannel;
59
60
61 private MembershipKey multicastGroupKey;
62
63
64 private Selector selector;
65
66
67
68
69
70 private static final int maxPoolSize = 2;
71
72
73 private final ExecutorService pooledExecutor;
74
75
76 private final AtomicInteger cnt = new AtomicInteger(0);
77
78
79 private final UDPDiscoveryService service;
80
81
82 private IElementSerializer serializer;
83
84
85 private final AtomicBoolean shutdown = new AtomicBoolean(false);
86
87
88
89
90
91
92
93
94
95
96
97
98 public UDPDiscoveryReceiver( final UDPDiscoveryService service,
99 final String multicastInterfaceString,
100 final String multicastAddressString,
101 final int multicastPort )
102 throws IOException
103 {
104 this(service, multicastInterfaceString,
105 InetAddress.getByName( multicastAddressString ),
106 multicastPort);
107 }
108
109
110
111
112
113
114
115
116
117
118
119 public UDPDiscoveryReceiver( final UDPDiscoveryService service,
120 final String multicastInterfaceString,
121 final InetAddress multicastAddress,
122 final int multicastPort )
123 throws IOException
124 {
125 this.service = service;
126 if (service != null)
127 {
128 this.serializer = service.getSerializer();
129 }
130
131
132 this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
133 new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
134 WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
135 "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
136
137 log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );
138 createSocket( multicastInterfaceString, multicastAddress, multicastPort );
139 }
140
141
142
143
144
145
146
147
148
149 private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
150 final int multicastPort )
151 throws IOException
152 {
153 try
154 {
155
156 NetworkInterface multicastInterface = null;
157 if (multicastInterfaceString != null)
158 {
159 multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
160 }
161 else
162 {
163 multicastInterface = HostNameUtil.getMulticastNetworkInterface();
164 }
165 if (multicastInterface != null)
166 {
167 log.info("Using network interface {0}", multicastInterface::getDisplayName);
168 }
169
170 multicastChannel = DatagramChannel.open(
171 multicastAddress instanceof Inet6Address ?
172 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET)
173 .setOption(StandardSocketOptions.SO_REUSEADDR, true)
174 .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface)
175 .bind(new InetSocketAddress(multicastPort));
176 multicastChannel.configureBlocking(false);
177
178 log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface);
179 multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface);
180
181 selector = Selector.open();
182 multicastChannel.register(selector, SelectionKey.OP_READ);
183 }
184 catch ( final IOException e )
185 {
186 log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
187 multicastPort, e );
188 throw e;
189 }
190 }
191
192 private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
193 new ArrayBlockingQueue<>(maxPoolSize);
194
195
196
197
198
199
200
201
202 @Deprecated
203 public Object waitForMessage()
204 throws IOException
205 {
206 try
207 {
208 return msgQueue.take();
209 }
210 catch (InterruptedException e)
211 {
212 throw new IOException("Interrupted waiting for message", e);
213 }
214 }
215
216
217 @Override
218 public void run()
219 {
220 try
221 {
222 log.debug( "Waiting for message." );
223
224 while (!shutdown.get())
225 {
226 int activeKeys = selector.select();
227 if (activeKeys == 0)
228 {
229 continue;
230 }
231
232 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
233 {
234 if (shutdown.get())
235 {
236 break;
237 }
238
239 SelectionKey key = i.next();
240 i.remove();
241
242 if (!key.isValid())
243 {
244 continue;
245 }
246
247 if (key.isReadable())
248 {
249 cnt.incrementAndGet();
250 log.debug( "{0} messages received.", this::getCnt );
251
252 DatagramChannel mc = (DatagramChannel) key.channel();
253
254 ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
255 InetSocketAddress sourceAddress =
256 (InetSocketAddress) mc.receive(byteBuffer);
257 byteBuffer.flip();
258
259 try
260 {
261 log.debug("Received packet from address [{0}]", sourceAddress);
262 byte[] bytes = new byte[byteBuffer.limit()];
263 byteBuffer.get(bytes);
264 Object obj = serializer.deSerialize(bytes, null);
265
266 if (obj instanceof UDPDiscoveryMessage)
267 {
268
269
270
271 final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
272 msg.setHost(sourceAddress.getHostString());
273
274 log.debug( "Read object from address [{0}], object=[{1}]",
275 sourceAddress, obj );
276
277
278 synchronized (msgQueue)
279 {
280
281 if (msgQueue.remainingCapacity() == 0)
282 {
283
284 msgQueue.remove();
285 }
286
287 msgQueue.add(msg);
288 }
289
290 pooledExecutor.execute(() -> handleMessage(msg));
291 log.debug( "Passed handler to executor." );
292 }
293 }
294 catch ( final IOException | ClassNotFoundException e )
295 {
296 log.error( "Error receiving multicast packet", e );
297 }
298 }
299 }
300 }
301 }
302 catch ( final IOException e )
303 {
304 log.error( "Unexpected exception in UDP receiver.", e );
305 }
306 }
307
308
309
310
311 public void setCnt( final int cnt )
312 {
313 this.cnt.set(cnt);
314 }
315
316
317
318
319 public int getCnt()
320 {
321 return cnt.get();
322 }
323
324
325
326
327
328
329
330 protected void setSerializer(IElementSerializer serializer)
331 {
332 this.serializer = serializer;
333 }
334
335
336
337
338
339 @Deprecated
340 public class MessageHandler
341 implements Runnable
342 {
343
344 private final UDPDiscoveryMessage message;
345
346
347
348
349 public MessageHandler( final UDPDiscoveryMessage message )
350 {
351 this.message = message;
352 }
353
354
355
356
357 @Override
358 public void run()
359 {
360 handleMessage(message);
361 }
362 }
363
364
365
366
367 private void handleMessage(UDPDiscoveryMessage message)
368 {
369
370 if ( message.getRequesterId() == CacheInfo.listenerId )
371 {
372 log.debug( "Ignoring message sent from self" );
373 }
374 else
375 {
376 log.debug( "Process message sent from another" );
377 log.debug( "Message = {0}", message );
378
379 if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
380 {
381 log.debug( "Ignoring invalid message: {0}", message );
382 }
383 else
384 {
385 processMessage(message);
386 }
387 }
388 }
389
390
391
392
393 private void processMessage(UDPDiscoveryMessage message)
394 {
395 final DiscoveredService discoveredService = new DiscoveredService(message);
396
397 switch (message.getMessageType())
398 {
399 case REMOVE:
400 log.debug( "Removing service from set {0}", discoveredService );
401 service.removeDiscoveredService( discoveredService );
402 break;
403 case REQUEST:
404
405
406 log.debug( "Message is a Request Broadcast, will have the service handle it." );
407 service.serviceRequestBroadcast();
408 break;
409 case PASSIVE:
410 default:
411 log.debug( "Adding or updating service to set {0}", discoveredService );
412 service.addOrUpdateService( discoveredService );
413 break;
414 }
415 }
416
417
418 @Override
419 public void shutdown()
420 {
421 if (shutdown.compareAndSet(false, true))
422 {
423 try
424 {
425 selector.close();
426 multicastGroupKey.drop();
427 multicastChannel.close();
428 }
429 catch ( final IOException e )
430 {
431 log.error( "Problem closing socket" );
432 }
433 }
434 }
435 }