1 package org.apache.commons.jcs3.utils.discovery;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.net.Inet4Address;
24 import java.net.Inet6Address;
25 import java.net.InetAddress;
26 import java.net.NetworkInterface;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.Enumeration;
30 import java.util.HashSet;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.concurrent.CopyOnWriteArraySet;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39
40 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
41 import org.apache.commons.jcs3.engine.behavior.IRequireScheduler;
42 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
43 import org.apache.commons.jcs3.log.Log;
44 import org.apache.commons.jcs3.log.LogManager;
45 import org.apache.commons.jcs3.utils.discovery.behavior.IDiscoveryListener;
46 import org.apache.commons.jcs3.utils.net.HostNameUtil;
47 import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
48
49
50
51
52
53
54
55
56
57
58 public class UDPDiscoveryService
59 implements IShutdownObserver, IRequireScheduler
60 {
61
62 private static final Log log = LogManager.getLog( UDPDiscoveryService.class );
63
64
65 private Thread udpReceiverThread;
66
67
68 private UDPDiscoveryReceiver receiver;
69
70
71 private UDPDiscoveryAttributes udpDiscoveryAttributes;
72
73
74 private final IElementSerializer serializer;
75
76
77 private final AtomicBoolean shutdown = new AtomicBoolean(false);
78
79
80 private final ConcurrentMap<Integer, DiscoveredService> discoveredServices =
81 new ConcurrentHashMap<>();
82
83
84 private final Set<String> cacheNames = new CopyOnWriteArraySet<>();
85
86
87 private final Set<IDiscoveryListener> discoveryListeners = new CopyOnWriteArraySet<>();
88
89
90 private ScheduledFuture<?> broadcastTaskFuture;
91
92
93 private ScheduledFuture<?> cleanupTaskFuture;
94
95
96
97
98
99
100
101 @Deprecated
102 public UDPDiscoveryService(final UDPDiscoveryAttributes attributes)
103 {
104 this(attributes, new StandardSerializer());
105 }
106
107
108
109
110
111
112
113
114 public UDPDiscoveryService(final UDPDiscoveryAttributes attributes, IElementSerializer serializer)
115 {
116 this.udpDiscoveryAttributes = attributes.clone();
117 this.serializer = serializer;
118
119 try
120 {
121 InetAddress multicastAddress = InetAddress.getByName(
122 getUdpDiscoveryAttributes().getUdpDiscoveryAddr());
123
124
125 if (getUdpDiscoveryAttributes().getServiceAddress() == null ||
126 getUdpDiscoveryAttributes().getServiceAddress().isEmpty())
127 {
128
129 NetworkInterface serviceInterface = null;
130 if (getUdpDiscoveryAttributes().getUdpDiscoveryInterface() != null)
131 {
132 serviceInterface = NetworkInterface.getByName(
133 getUdpDiscoveryAttributes().getUdpDiscoveryInterface());
134 }
135 else
136 {
137 serviceInterface = HostNameUtil.getMulticastNetworkInterface();
138 }
139
140 try
141 {
142 InetAddress serviceAddress = null;
143
144 for (Enumeration<InetAddress> addresses = serviceInterface.getInetAddresses();
145 addresses.hasMoreElements();)
146 {
147 serviceAddress = addresses.nextElement();
148
149 if (multicastAddress instanceof Inet6Address)
150 {
151 if (serviceAddress instanceof Inet6Address &&
152 !serviceAddress.isLoopbackAddress() &&
153 !serviceAddress.isMulticastAddress() &&
154 serviceAddress.isLinkLocalAddress())
155 {
156
157 break;
158 }
159 }
160 else
161 {
162 if (serviceAddress instanceof Inet4Address &&
163 !serviceAddress.isLoopbackAddress() &&
164 !serviceAddress.isMulticastAddress() &&
165 serviceAddress.isSiteLocalAddress())
166 {
167
168 break;
169 }
170 }
171 }
172
173 if (serviceAddress == null)
174 {
175
176 serviceAddress = HostNameUtil.getLocalHostLANAddress();
177 }
178
179 getUdpDiscoveryAttributes().setServiceAddress(serviceAddress.getHostAddress());
180 }
181 catch ( final UnknownHostException e )
182 {
183 log.error( "Couldn't get local host address", e );
184 }
185 }
186
187
188 receiver = new UDPDiscoveryReceiver( this,
189 getUdpDiscoveryAttributes().getUdpDiscoveryInterface(),
190 multicastAddress,
191 getUdpDiscoveryAttributes().getUdpDiscoveryPort() );
192 }
193 catch ( final IOException e )
194 {
195 log.error( "Problem creating UDPDiscoveryReceiver, address [{0}] "
196 + "port [{1}] we won't be able to find any other caches",
197 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
198 getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
199 }
200
201
202 initiateBroadcast();
203 }
204
205
206
207
208 @Override
209 public void setScheduledExecutorService(final ScheduledExecutorService scheduledExecutor)
210 {
211 this.broadcastTaskFuture = scheduledExecutor.scheduleAtFixedRate(
212 this::serviceRequestBroadcast, 0, 15, TimeUnit.SECONDS);
213
214
215
216
217
218 this.cleanupTaskFuture = scheduledExecutor.scheduleAtFixedRate(
219 this::cleanup, 0,
220 getUdpDiscoveryAttributes().getMaxIdleTimeSec(), TimeUnit.SECONDS);
221 }
222
223
224
225
226
227
228
229 protected void cleanup()
230 {
231 final long now = System.currentTimeMillis();
232
233
234 getDiscoveredServices().stream()
235 .filter(service -> {
236 if (now - service.getLastHearFromTime() > getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000)
237 {
238 log.info( "Removing service, since we haven't heard from it in "
239 + "{0} seconds. service = {1}",
240 getUdpDiscoveryAttributes().getMaxIdleTimeSec(), service );
241 return true;
242 }
243
244 return false;
245 })
246
247
248 .forEach(this::removeDiscoveredService);
249 }
250
251
252
253
254
255
256 public void initiateBroadcast()
257 {
258 log.debug( "Creating sender for discoveryAddress = [{0}] and "
259 + "discoveryPort = [{1}] myHostName = [{2}] and port = [{3}]",
260 () -> getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
261 () -> getUdpDiscoveryAttributes().getUdpDiscoveryPort(),
262 () -> getUdpDiscoveryAttributes().getServiceAddress(),
263 () -> getUdpDiscoveryAttributes().getServicePort() );
264
265 try (UDPDiscoverySender sender = new UDPDiscoverySender(
266 getUdpDiscoveryAttributes(), getSerializer()))
267 {
268 sender.requestBroadcast();
269
270 log.debug( "Sent a request broadcast to the group" );
271 }
272 catch ( final IOException e )
273 {
274 log.error( "Problem sending a Request Broadcast", e );
275 }
276 }
277
278
279
280
281
282
283
284 protected void serviceRequestBroadcast()
285 {
286
287
288 try (UDPDiscoverySender sender = new UDPDiscoverySender(
289 getUdpDiscoveryAttributes(), getSerializer()))
290 {
291 sender.passiveBroadcast(
292 getUdpDiscoveryAttributes().getServiceAddress(),
293 getUdpDiscoveryAttributes().getServicePort(),
294 this.getCacheNames() );
295
296 log.debug( "Called sender to issue a passive broadcast" );
297 }
298 catch ( final IOException e )
299 {
300 log.error( "Problem calling the UDP Discovery Sender, address [{0}] "
301 + "port [{1}]",
302 getUdpDiscoveryAttributes().getUdpDiscoveryAddr(),
303 getUdpDiscoveryAttributes().getUdpDiscoveryPort(), e );
304 }
305 }
306
307
308
309
310
311
312 protected void shutdownBroadcast()
313 {
314
315
316 try (UDPDiscoverySender sender = new UDPDiscoverySender(
317 getUdpDiscoveryAttributes(), getSerializer()))
318 {
319 sender.removeBroadcast(
320 getUdpDiscoveryAttributes().getServiceAddress(),
321 getUdpDiscoveryAttributes().getServicePort(),
322 this.getCacheNames() );
323
324 log.debug( "Called sender to issue a remove broadcast in shutdown." );
325 }
326 catch ( final IOException e )
327 {
328 log.error( "Problem calling the UDP Discovery Sender", e );
329 }
330 }
331
332
333
334
335
336
337 public void addParticipatingCacheName( final String cacheName )
338 {
339 cacheNames.add( cacheName );
340 }
341
342
343
344
345
346
347 public void removeDiscoveredService( final DiscoveredService service )
348 {
349 if (discoveredServices.remove(service.hashCode()) != null)
350 {
351 log.info( "Removing {0}", service );
352 }
353
354 getDiscoveryListeners().forEach(listener -> listener.removeDiscoveredService(service));
355 }
356
357
358
359
360
361
362 protected void addOrUpdateService( final DiscoveredService discoveredService )
363 {
364
365
366 discoveredServices.merge(discoveredService.hashCode(), discoveredService, (oldService, newService) -> {
367 log.debug( "Set contains service." );
368 log.debug( "Updating service in the set {0}", newService );
369
370
371
372 if (!oldService.getCacheNames().equals(newService.getCacheNames()))
373 {
374 log.info( "List of cache names changed for service: {0}", newService );
375
376
377 return newService;
378 }
379
380 if (oldService.getLastHearFromTime() != newService.getLastHearFromTime())
381 {
382 return newService;
383 }
384
385 return oldService;
386 });
387
388
389
390
391
392 getDiscoveryListeners().forEach(listener -> listener.addDiscoveredService(discoveredService));
393 }
394
395
396
397
398
399
400 protected ArrayList<String> getCacheNames()
401 {
402 return new ArrayList<>(cacheNames);
403 }
404
405
406
407
408 public void setUdpDiscoveryAttributes( final UDPDiscoveryAttributes attr )
409 {
410 this.udpDiscoveryAttributes = attr;
411 }
412
413
414
415
416 public UDPDiscoveryAttributes getUdpDiscoveryAttributes()
417 {
418 return this.udpDiscoveryAttributes;
419 }
420
421
422
423
424
425
426
427 public IElementSerializer getSerializer()
428 {
429 return serializer;
430 }
431
432
433
434
435 public void startup()
436 {
437 udpReceiverThread = new Thread(receiver);
438 udpReceiverThread.setDaemon(true);
439
440 udpReceiverThread.start();
441 }
442
443
444
445
446 @Override
447 public void shutdown()
448 {
449 if (shutdown.compareAndSet(false, true))
450 {
451
452 if (broadcastTaskFuture != null)
453 {
454 broadcastTaskFuture.cancel(false);
455 }
456 if (cleanupTaskFuture != null)
457 {
458 cleanupTaskFuture.cancel(false);
459 }
460
461 if (receiver != null)
462 {
463 log.info( "Shutting down UDP discovery service receiver." );
464 receiver.shutdown();
465 }
466
467 log.info( "Shutting down UDP discovery service sender." );
468
469
470 shutdownBroadcast();
471 }
472 else
473 {
474 log.debug( "Shutdown already called." );
475 }
476 }
477
478
479
480
481 public Set<DiscoveredService> getDiscoveredServices()
482 {
483 return new HashSet<>(discoveredServices.values());
484 }
485
486
487
488
489 private Set<IDiscoveryListener> getDiscoveryListeners()
490 {
491 return discoveryListeners;
492 }
493
494
495
496
497 public Set<IDiscoveryListener> getCopyOfDiscoveryListeners()
498 {
499 return new HashSet<>(getDiscoveryListeners());
500 }
501
502
503
504
505
506
507
508 public boolean addDiscoveryListener( final IDiscoveryListener listener )
509 {
510 return getDiscoveryListeners().add( listener );
511 }
512
513
514
515
516
517
518
519 public boolean removeDiscoveryListener( final IDiscoveryListener listener )
520 {
521 return getDiscoveryListeners().remove( listener );
522 }
523 }