View Javadoc
1   package org.apache.commons.jcs3.utils.discovery;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.NetworkInterface;
27  import java.net.StandardProtocolFamily;
28  import java.net.StandardSocketOptions;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.DatagramChannel;
31  import java.nio.channels.MembershipKey;
32  import java.nio.channels.SelectionKey;
33  import java.nio.channels.Selector;
34  import java.util.Iterator;
35  import java.util.concurrent.ArrayBlockingQueue;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import org.apache.commons.jcs3.engine.CacheInfo;
41  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
42  import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
43  import org.apache.commons.jcs3.log.Log;
44  import org.apache.commons.jcs3.log.LogManager;
45  import org.apache.commons.jcs3.utils.net.HostNameUtil;
46  import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration;
47  import org.apache.commons.jcs3.utils.threadpool.PoolConfiguration.WhenBlockedPolicy;
48  import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
49  
50  /** Receives UDP Discovery messages. */
51  public class UDPDiscoveryReceiver
52      implements Runnable, IShutdownObserver
53  {
54      /** The log factory */
55      private static final Log log = LogManager.getLog( UDPDiscoveryReceiver.class );
56  
57      /** The channel used for communication. */
58      private DatagramChannel multicastChannel;
59  
60      /** The group membership key. */
61      private MembershipKey multicastGroupKey;
62  
63      /** The selector. */
64      private Selector selector;
65  
66      /**
67       * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
68       * restriction on the pool size
69       */
70      private static final int maxPoolSize = 2;
71  
72      /** The processor */
73      private final ExecutorService pooledExecutor;
74  
75      /** number of messages received. For debugging and testing. */
76      private final AtomicInteger cnt = new AtomicInteger(0);
77  
78      /** Service to get cache names and handle request broadcasts */
79      private final UDPDiscoveryService service;
80  
81      /** Serializer */
82      private IElementSerializer serializer;
83  
84      /** Is it shutdown. */
85      private final AtomicBoolean shutdown = new AtomicBoolean(false);
86  
87      /**
88       * Constructor for the UDPDiscoveryReceiver object.
89       * <p>
90       * We determine our own host using InetAddress
91       *<p>
92       * @param service
93       * @param multicastInterfaceString
94       * @param multicastAddressString
95       * @param multicastPort
96       * @throws IOException
97       */
98      public UDPDiscoveryReceiver( final UDPDiscoveryService service,
99              final String multicastInterfaceString,
100             final String multicastAddressString,
101             final int multicastPort )
102         throws IOException
103     {
104         this(service, multicastInterfaceString,
105                 InetAddress.getByName( multicastAddressString ),
106                 multicastPort);
107     }
108 
109     /**
110      * Constructor for the UDPDiscoveryReceiver object.
111      * <p>
112      * @param service
113      * @param multicastInterfaceString
114      * @param multicastAddress
115      * @param multicastPort
116      * @throws IOException
117      * @since 3.1
118      */
119     public UDPDiscoveryReceiver( final UDPDiscoveryService service,
120             final String multicastInterfaceString,
121             final InetAddress multicastAddress,
122             final int multicastPort )
123         throws IOException
124     {
125         this.service = service;
126         if (service != null)
127         {
128             this.serializer = service.getSerializer();
129         }
130 
131         // create a small thread pool to handle a barrage
132         this.pooledExecutor = ThreadPoolManager.getInstance().createPool(
133                 new PoolConfiguration(false, 0, maxPoolSize, maxPoolSize, 0,
134                         WhenBlockedPolicy.DISCARDOLDEST, maxPoolSize),
135                 "JCS-UDPDiscoveryReceiver-", Thread.MIN_PRIORITY);
136 
137         log.info( "Constructing listener, [{0}:{1}]", multicastAddress, multicastPort );
138         createSocket( multicastInterfaceString, multicastAddress, multicastPort );
139     }
140 
141     /**
142      * Creates the socket for this class.
143      * <p>
144      * @param multicastInterfaceString
145      * @param multicastAddress
146      * @param multicastPort
147      * @throws IOException
148      */
149     private void createSocket( final String multicastInterfaceString, final InetAddress multicastAddress,
150             final int multicastPort )
151         throws IOException
152     {
153         try
154         {
155             // Use dedicated interface if specified
156             NetworkInterface multicastInterface = null;
157             if (multicastInterfaceString != null)
158             {
159                 multicastInterface = NetworkInterface.getByName(multicastInterfaceString);
160             }
161             else
162             {
163                 multicastInterface = HostNameUtil.getMulticastNetworkInterface();
164             }
165             if (multicastInterface != null)
166             {
167                 log.info("Using network interface {0}", multicastInterface::getDisplayName);
168             }
169 
170             multicastChannel = DatagramChannel.open(
171                     multicastAddress instanceof Inet6Address ?
172                             StandardProtocolFamily.INET6 : StandardProtocolFamily.INET)
173                     .setOption(StandardSocketOptions.SO_REUSEADDR, true)
174                     .setOption(StandardSocketOptions.IP_MULTICAST_IF, multicastInterface)
175                     .bind(new InetSocketAddress(multicastPort));
176             multicastChannel.configureBlocking(false);
177 
178             log.info("Joining Group: [{0}] on {1}", multicastAddress, multicastInterface);
179             multicastGroupKey = multicastChannel.join(multicastAddress, multicastInterface);
180 
181             selector = Selector.open();
182             multicastChannel.register(selector, SelectionKey.OP_READ);
183         }
184         catch ( final IOException e )
185         {
186             log.error( "Could not bind to multicast address [{0}:{1}]", multicastAddress,
187                     multicastPort, e );
188             throw e;
189         }
190     }
191 
192     private final ArrayBlockingQueue<UDPDiscoveryMessage> msgQueue =
193             new ArrayBlockingQueue<>(maxPoolSize);
194 
195     /**
196      * Wait for multicast message
197      *
198      * @return the object message
199      * @throws IOException
200      * @deprecated no longer used
201      */
202     @Deprecated
203     public Object waitForMessage()
204         throws IOException
205     {
206         try
207         {
208             return msgQueue.take();
209         }
210         catch (InterruptedException e)
211         {
212             throw new IOException("Interrupted waiting for message", e);
213         }
214     }
215 
216     /** Main processing method for the UDPDiscoveryReceiver object */
217     @Override
218     public void run()
219     {
220         try
221         {
222             log.debug( "Waiting for message." );
223 
224             while (!shutdown.get())
225             {
226                 int activeKeys = selector.select();
227                 if (activeKeys == 0)
228                 {
229                     continue;
230                 }
231 
232                 for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
233                 {
234                     if (shutdown.get())
235                     {
236                         break;
237                     }
238 
239                     SelectionKey key = i.next();
240                     i.remove();
241 
242                     if (!key.isValid())
243                     {
244                         continue;
245                     }
246 
247                     if (key.isReadable())
248                     {
249                         cnt.incrementAndGet();
250                         log.debug( "{0} messages received.", this::getCnt );
251 
252                         DatagramChannel mc = (DatagramChannel) key.channel();
253 
254                         ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
255                         InetSocketAddress sourceAddress =
256                                 (InetSocketAddress) mc.receive(byteBuffer);
257                         byteBuffer.flip();
258 
259                         try
260                         {
261                             log.debug("Received packet from address [{0}]", sourceAddress);
262                             byte[] bytes = new byte[byteBuffer.limit()];
263                             byteBuffer.get(bytes);
264                             Object obj = serializer.deSerialize(bytes, null);
265 
266                             if (obj instanceof UDPDiscoveryMessage)
267                             {
268                                 // Ensure that the address we're supposed to send to is, indeed, the address
269                                 // of the machine on the other end of this connection.  This guards against
270                                 // instances where we don't exactly get the right local host address
271                                 final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
272                                 msg.setHost(sourceAddress.getHostString());
273 
274                                 log.debug( "Read object from address [{0}], object=[{1}]",
275                                         sourceAddress, obj );
276 
277                                 // Just to keep the functionality of the deprecated waitForMessage method
278                                 synchronized (msgQueue)
279                                 {
280                                     // Check if queue full already?
281                                     if (msgQueue.remainingCapacity() == 0)
282                                     {
283                                         // remove oldest element from queue
284                                         msgQueue.remove();
285                                     }
286 
287                                     msgQueue.add(msg);
288                                 }
289 
290                                 pooledExecutor.execute(() -> handleMessage(msg));
291                                 log.debug( "Passed handler to executor." );
292                             }
293                         }
294                         catch ( final IOException | ClassNotFoundException e )
295                         {
296                             log.error( "Error receiving multicast packet", e );
297                         }
298                     }
299                 }
300             } // end while
301         }
302         catch ( final IOException e )
303         {
304             log.error( "Unexpected exception in UDP receiver.", e );
305         }
306     }
307 
308     /**
309      * @param cnt The cnt to set.
310      */
311     public void setCnt( final int cnt )
312     {
313         this.cnt.set(cnt);
314     }
315 
316     /**
317      * @return Returns the cnt.
318      */
319     public int getCnt()
320     {
321         return cnt.get();
322     }
323 
324     /**
325      * For testing
326      *
327      * @param serializer the serializer to set
328      * @since 3.1
329      */
330     protected void setSerializer(IElementSerializer serializer)
331     {
332         this.serializer = serializer;
333     }
334 
335     /**
336      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
337      * @deprecated No longer used
338      */
339     @Deprecated
340     public class MessageHandler
341         implements Runnable
342     {
343         /** The message to handle. Passed in during construction. */
344         private final UDPDiscoveryMessage message;
345 
346         /**
347          * @param message
348          */
349         public MessageHandler( final UDPDiscoveryMessage message )
350         {
351             this.message = message;
352         }
353 
354         /**
355          * Process the message.
356          */
357         @Override
358         public void run()
359         {
360             handleMessage(message);
361         }
362     }
363 
364     /**
365      * Separate thread run when a command comes into the UDPDiscoveryReceiver.
366      */
367     private void handleMessage(UDPDiscoveryMessage message)
368     {
369         // consider comparing ports here instead.
370         if ( message.getRequesterId() == CacheInfo.listenerId )
371         {
372             log.debug( "Ignoring message sent from self" );
373         }
374         else
375         {
376             log.debug( "Process message sent from another" );
377             log.debug( "Message = {0}", message );
378 
379             if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
380             {
381                 log.debug( "Ignoring invalid message: {0}", message );
382             }
383             else
384             {
385                 processMessage(message);
386             }
387         }
388     }
389 
390     /**
391      * Process the incoming message.
392      */
393     private void processMessage(UDPDiscoveryMessage message)
394     {
395         final DiscoveredService discoveredService = new DiscoveredService(message);
396 
397         switch (message.getMessageType())
398         {
399             case REMOVE:
400                 log.debug( "Removing service from set {0}", discoveredService );
401                 service.removeDiscoveredService( discoveredService );
402                 break;
403             case REQUEST:
404                 // if this is a request message, have the service handle it and
405                 // return
406                 log.debug( "Message is a Request Broadcast, will have the service handle it." );
407                 service.serviceRequestBroadcast();
408                 break;
409             case PASSIVE:
410             default:
411                 log.debug( "Adding or updating service to set {0}", discoveredService );
412                 service.addOrUpdateService( discoveredService );
413                 break;
414         }
415     }
416 
417     /** Shuts down the socket. */
418     @Override
419     public void shutdown()
420     {
421         if (shutdown.compareAndSet(false, true))
422         {
423             try
424             {
425                 selector.close();
426                 multicastGroupKey.drop();
427                 multicastChannel.close();
428             }
429             catch ( final IOException e )
430             {
431                 log.error( "Problem closing socket" );
432             }
433         }
434     }
435 }