1 package org.apache.commons.jcs3.auxiliary.remote.server;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.rmi.RemoteException;
25 import java.rmi.registry.Registry;
26 import java.rmi.server.RMISocketFactory;
27 import java.rmi.server.UnicastRemoteObject;
28 import java.rmi.server.Unreferenced;
29 import java.util.Collections;
30 import java.util.Map;
31 import java.util.Properties;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35
36 import org.apache.commons.jcs3.access.exception.CacheException;
37 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
38 import org.apache.commons.jcs3.auxiliary.remote.server.behavior.IRemoteCacheServer;
39 import org.apache.commons.jcs3.auxiliary.remote.server.behavior.IRemoteCacheServerAttributes;
40 import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType;
41 import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
42 import org.apache.commons.jcs3.engine.CacheListeners;
43 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
44 import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
45 import org.apache.commons.jcs3.engine.behavior.ICacheListener;
46 import org.apache.commons.jcs3.engine.control.CompositeCache;
47 import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
48 import org.apache.commons.jcs3.engine.logging.CacheEvent;
49 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEvent;
50 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
51 import org.apache.commons.jcs3.log.Log;
52 import org.apache.commons.jcs3.log.LogManager;
53 import org.apache.commons.jcs3.utils.timing.ElapsedTimer;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class RemoteCacheServer<K, V>
71 extends UnicastRemoteObject
72 implements IRemoteCacheServer<K, V>, Unreferenced
73 {
74 public static final String DFEAULT_REMOTE_CONFIGURATION_FILE = "/remote.cache.ccf";
75
76
77 private static final long serialVersionUID = -8072345435941473116L;
78
79
80 private static final Log log = LogManager.getLog( RemoteCacheServer.class );
81
82
83 private int puts;
84
85
86 private final transient ConcurrentMap<String, CacheListeners<K, V>> cacheListenersMap =
87 new ConcurrentHashMap<>();
88
89
90 private final transient ConcurrentMap<String, CacheListeners<K, V>> clusterListenersMap =
91 new ConcurrentHashMap<>();
92
93
94 private transient CompositeCacheManager cacheManager;
95
96
97 private final ConcurrentMap<Long, RemoteType> idTypeMap = new ConcurrentHashMap<>();
98
99
100 private final ConcurrentMap<Long, String> idIPMap = new ConcurrentHashMap<>();
101
102
103 private final int[] listenerId = new int[1];
104
105
106
107 final IRemoteCacheServerAttributes remoteCacheServerAttributes;
108
109
110 private static final int logInterval = 100;
111
112
113 private transient ICacheEventLogger cacheEventLogger;
114
115
116
117
118
119
120
121
122
123 protected RemoteCacheServer( final IRemoteCacheServerAttributes rcsa, final Properties config )
124 throws RemoteException
125 {
126 super( rcsa.getServicePort() );
127 this.remoteCacheServerAttributes = rcsa;
128 init( config );
129 }
130
131
132
133
134
135
136
137
138
139
140 protected RemoteCacheServer( final IRemoteCacheServerAttributes rcsa, final Properties config, final RMISocketFactory customRMISocketFactory )
141 throws RemoteException
142 {
143 super( rcsa.getServicePort(), customRMISocketFactory, customRMISocketFactory );
144 this.remoteCacheServerAttributes = rcsa;
145 init( config );
146 }
147
148
149
150
151
152
153
154 private void init( final Properties prop ) throws RemoteException
155 {
156 try
157 {
158 cacheManager = createCacheManager( prop );
159 }
160 catch (final CacheException e)
161 {
162 throw new RemoteException(e.getMessage(), e);
163 }
164
165
166
167 cacheManager.getCacheNames().forEach(name -> {
168 final CompositeCache<K, V> cache = cacheManager.getCache( name );
169 cacheListenersMap.put( name, new CacheListeners<>( cache ) );
170 });
171 }
172
173
174
175
176
177
178
179
180
181 private static CompositeCacheManager createCacheManager( final Properties prop ) throws CacheException
182 {
183 final CompositeCacheManager hub = CompositeCacheManager.getUnconfiguredInstance();
184 hub.configure( prop );
185 return hub;
186 }
187
188
189
190
191
192
193
194
195
196
197
198 public void put( final ICacheElement<K, V> item )
199 throws IOException
200 {
201 update( item );
202 }
203
204
205
206
207
208 @Override
209 public void update( final ICacheElement<K, V> item )
210 throws IOException
211 {
212 update( item, 0 );
213 }
214
215
216
217
218
219
220
221
222 @Override
223 public void update( final ICacheElement<K, V> item, final long requesterId )
224 throws IOException
225 {
226 final ICacheEvent<ICacheElement<K, V>> cacheEvent = createICacheEvent( item, requesterId, ICacheEventLogger.UPDATE_EVENT );
227 try
228 {
229 processUpdate( item, requesterId );
230 }
231 finally
232 {
233 logICacheEvent( cacheEvent );
234 }
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 private void processUpdate( final ICacheElement<K, V> item, final long requesterId )
261 {
262 final ElapsedTimer timer = new ElapsedTimer();
263 logUpdateInfo( item );
264
265 try
266 {
267 final CacheListeners<K, V> cacheDesc = getCacheListeners( item.getCacheName() );
268 final boolean fromCluster = isRequestFromCluster( requesterId );
269
270 log.debug( "In update, requesterId = [{0}] fromCluster = {1}", requesterId, fromCluster );
271
272
273 synchronized ( cacheDesc )
274 {
275 try
276 {
277 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293 if ( fromCluster )
294 {
295 log.debug( "Put FROM cluster, NOT updating other auxiliaries for region. "
296 + " requesterId [{0}]", requesterId );
297 c.localUpdate( item );
298 }
299 else
300 {
301 log.debug( "Put NOT from cluster, updating other auxiliaries for region. "
302 + " requesterId [{0}]", requesterId );
303 c.update( item );
304 }
305 }
306 catch ( final IOException ce )
307 {
308
309 log.info( "Exception caught updating item. requesterId [{0}]: {1}",
310 requesterId, ce.getMessage() );
311 }
312
313
314
315 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency())
316 {
317 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
318 log.debug( "qlist.length = {0}", qlist.length );
319 for (final ICacheEventQueue<K, V> element : qlist) {
320 element.addPutEvent( item );
321 }
322 }
323 }
324 }
325 catch ( final IOException e )
326 {
327 if ( cacheEventLogger != null )
328 {
329 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.UPDATE_EVENT, e.getMessage()
330 + " REGION: " + item.getCacheName() + " ITEM: " + item );
331 }
332
333 log.error( "Trouble in Update. requesterId [{0}]", requesterId, e );
334 }
335
336
337 log.debug( "put took {0} ms.", timer::getElapsedTime);
338 }
339
340
341
342
343
344
345 private void logUpdateInfo( final ICacheElement<K, V> item )
346 {
347
348 puts++;
349
350 if ( log.isInfoEnabled() && (puts % logInterval == 0) )
351 {
352 log.info( "puts = {0}", puts );
353 }
354
355 log.debug( "In update, put [{0}] in [{1}]",
356 item::getKey, item::getCacheName);
357 }
358
359
360
361
362
363
364
365
366
367
368 @Override
369 public ICacheElement<K, V> get( final String cacheName, final K key )
370 throws IOException
371 {
372 return this.get( cacheName, key, 0 );
373 }
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388 @Override
389 public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId )
390 throws IOException
391 {
392 ICacheElement<K, V> element = null;
393 final ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.GET_EVENT );
394 try
395 {
396 element = processGet( cacheName, key, requesterId );
397 }
398 finally
399 {
400 logICacheEvent( cacheEvent );
401 }
402 return element;
403 }
404
405
406
407
408
409
410
411
412
413
414
415 private ICacheElement<K, V> processGet( final String cacheName, final K key, final long requesterId )
416 {
417 final boolean fromCluster = isRequestFromCluster( requesterId );
418
419 log.debug( "get [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}",
420 key, cacheName, requesterId, fromCluster );
421
422 final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
423
424 return getFromCacheListeners( key, fromCluster, cacheDesc, null );
425 }
426
427
428
429
430
431
432
433
434
435
436 private ICacheElement<K, V> getFromCacheListeners( final K key, final boolean fromCluster, final CacheListeners<K, V> cacheDesc,
437 final ICacheElement<K, V> element )
438 {
439 ICacheElement<K, V> returnElement = element;
440
441 if ( cacheDesc != null )
442 {
443 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
460 {
461 log.debug( "NonLocalGet. fromCluster [{0}] AllowClusterGet [{1}]",
462 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
463 returnElement = c.get( key );
464 }
465 else
466 {
467
468
469
470 log.debug( "LocalGet. fromCluster [{0}] AllowClusterGet [{1}]",
471 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
472 returnElement = c.localGet( key );
473 }
474 }
475
476 return returnElement;
477 }
478
479
480
481
482
483
484
485
486
487 @Override
488 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern )
489 throws IOException
490 {
491 return getMatching( cacheName, pattern, 0 );
492 }
493
494
495
496
497
498
499
500
501
502
503 @Override
504 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId )
505 throws IOException
506 {
507 final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, pattern, requesterId,
508 ICacheEventLogger.GETMATCHING_EVENT );
509 try
510 {
511 return processGetMatching( cacheName, pattern, requesterId );
512 }
513 finally
514 {
515 logICacheEvent( cacheEvent );
516 }
517 }
518
519
520
521
522
523
524
525
526
527 protected Map<K, ICacheElement<K, V>> processGetMatching( final String cacheName, final String pattern, final long requesterId )
528 {
529 final boolean fromCluster = isRequestFromCluster( requesterId );
530
531 log.debug( "getMatching [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}",
532 pattern, cacheName, requesterId, fromCluster );
533
534 CacheListeners<K, V> cacheDesc = null;
535 try
536 {
537 cacheDesc = getCacheListeners( cacheName );
538 }
539 catch ( final Exception e )
540 {
541 log.error( "Problem getting listeners.", e );
542
543 if ( cacheEventLogger != null )
544 {
545 cacheEventLogger.logError( "RemoteCacheServer", ICacheEventLogger.GETMATCHING_EVENT, e.getMessage()
546 + cacheName + " pattern: " + pattern );
547 }
548 }
549
550 return getMatchingFromCacheListeners( pattern, fromCluster, cacheDesc );
551 }
552
553
554
555
556
557
558
559
560
561 private Map<K, ICacheElement<K, V>> getMatchingFromCacheListeners( final String pattern, final boolean fromCluster, final CacheListeners<K, V> cacheDesc )
562 {
563 Map<K, ICacheElement<K, V>> elements = null;
564 if ( cacheDesc != null )
565 {
566 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
567
568
569
570
571
572 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
573 {
574 log.debug( "NonLocalGetMatching. fromCluster [{0}] AllowClusterGet [{1}]",
575 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
576 elements = c.getMatching( pattern );
577 }
578 else
579 {
580
581
582
583
584 log.debug( "LocalGetMatching. fromCluster [{0}] AllowClusterGet [{1}]",
585 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
586 elements = c.localGetMatching( pattern );
587 }
588 }
589 return elements;
590 }
591
592
593
594
595
596
597
598
599
600
601 @Override
602 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys )
603 throws IOException
604 {
605 return this.getMultiple( cacheName, keys, 0 );
606 }
607
608
609
610
611
612
613
614
615
616
617
618
619
620 @Override
621 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId )
622 throws IOException
623 {
624 final ICacheEvent<Serializable> cacheEvent = createICacheEvent( cacheName, (Serializable) keys, requesterId,
625 ICacheEventLogger.GETMULTIPLE_EVENT );
626 try
627 {
628 return processGetMultiple( cacheName, keys, requesterId );
629 }
630 finally
631 {
632 logICacheEvent( cacheEvent );
633 }
634 }
635
636
637
638
639
640
641
642
643
644
645 private Map<K, ICacheElement<K, V>> processGetMultiple( final String cacheName, final Set<K> keys, final long requesterId )
646 {
647 final boolean fromCluster = isRequestFromCluster( requesterId );
648
649 log.debug( "getMultiple [{0}] from cache [{1}] requesterId = [{2}] fromCluster = {3}",
650 keys, cacheName, requesterId, fromCluster );
651
652 final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
653 return getMultipleFromCacheListeners( keys, null, fromCluster, cacheDesc );
654 }
655
656
657
658
659
660
661
662
663
664 private boolean isRequestFromCluster( final long requesterId )
665 {
666 final RemoteType remoteTypeL = idTypeMap.get( Long.valueOf( requesterId ) );
667 return remoteTypeL == RemoteType.CLUSTER;
668 }
669
670
671
672
673
674
675
676
677
678
679 private Map<K, ICacheElement<K, V>> getMultipleFromCacheListeners( final Set<K> keys, final Map<K, ICacheElement<K, V>> elements, final boolean fromCluster, final CacheListeners<K, V> cacheDesc )
680 {
681 Map<K, ICacheElement<K, V>> returnElements = elements;
682
683 if ( cacheDesc != null )
684 {
685 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701 if ( !fromCluster && this.remoteCacheServerAttributes.isAllowClusterGet() )
702 {
703 log.debug( "NonLocalGetMultiple. fromCluster [{0}] AllowClusterGet [{1}]",
704 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
705
706 returnElements = c.getMultiple( keys );
707 }
708 else
709 {
710
711
712
713
714 log.debug( "LocalGetMultiple. fromCluster [{0}] AllowClusterGet [{1}]",
715 fromCluster, this.remoteCacheServerAttributes.isAllowClusterGet() );
716
717 returnElements = c.localGetMultiple( keys );
718 }
719 }
720
721 return returnElements;
722 }
723
724
725
726
727
728
729
730 @Override
731 public Set<K> getKeySet(final String cacheName) throws IOException
732 {
733 return processGetKeySet( cacheName );
734 }
735
736
737
738
739
740
741
742 protected Set<K> processGetKeySet( final String cacheName )
743 {
744 final CacheListeners<K, V> cacheDesc = getCacheListeners( cacheName );
745
746 if ( cacheDesc == null )
747 {
748 return Collections.emptySet();
749 }
750
751 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
752 return c.getKeySet();
753 }
754
755
756
757
758
759
760
761
762 @Override
763 public void remove( final String cacheName, final K key )
764 throws IOException
765 {
766 remove( cacheName, key, 0 );
767 }
768
769
770
771
772
773
774
775
776
777
778
779 @Override
780 public void remove( final String cacheName, final K key, final long requesterId )
781 throws IOException
782 {
783 final ICacheEvent<K> cacheEvent = createICacheEvent( cacheName, key, requesterId, ICacheEventLogger.REMOVE_EVENT );
784 try
785 {
786 processRemove( cacheName, key, requesterId );
787 }
788 finally
789 {
790 logICacheEvent( cacheEvent );
791 }
792 }
793
794
795
796
797
798
799
800
801
802 private void processRemove( final String cacheName, final K key, final long requesterId )
803 throws IOException
804 {
805 log.debug( "remove [{0}] from cache [{1}]", key, cacheName );
806
807 final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
808
809 final boolean fromCluster = isRequestFromCluster( requesterId );
810
811 if ( cacheDesc != null )
812 {
813
814
815 synchronized ( cacheDesc )
816 {
817 boolean removeSuccess = false;
818
819
820 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
821
822 if ( fromCluster )
823 {
824 log.debug( "Remove FROM cluster, NOT updating other auxiliaries for region" );
825 removeSuccess = c.localRemove( key );
826 }
827 else
828 {
829 log.debug( "Remove NOT from cluster, updating other auxiliaries for region" );
830 removeSuccess = c.remove( key );
831 }
832
833 log.debug( "remove [{0}] from cache [{1}] success (was it found) = {2}",
834 key, cacheName, removeSuccess );
835
836
837
838 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency())
839 {
840 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
841
842 for (final ICacheEventQueue<K, V> element : qlist) {
843 element.addRemoveEvent( key );
844 }
845 }
846 }
847 }
848 }
849
850
851
852
853
854
855
856 @Override
857 public void removeAll( final String cacheName )
858 throws IOException
859 {
860 removeAll( cacheName, 0 );
861 }
862
863
864
865
866
867
868
869
870
871
872 @Override
873 public void removeAll( final String cacheName, final long requesterId )
874 throws IOException
875 {
876 final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "all", requesterId, ICacheEventLogger.REMOVEALL_EVENT );
877 try
878 {
879 processRemoveAll( cacheName, requesterId );
880 }
881 finally
882 {
883 logICacheEvent( cacheEvent );
884 }
885 }
886
887
888
889
890
891
892
893
894 private void processRemoveAll( final String cacheName, final long requesterId )
895 throws IOException
896 {
897 final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
898
899 final boolean fromCluster = isRequestFromCluster( requesterId );
900
901 if ( cacheDesc != null )
902 {
903
904
905 synchronized ( cacheDesc )
906 {
907
908 final CompositeCache<K, V> c = (CompositeCache<K, V>) cacheDesc.cache;
909
910 if ( fromCluster )
911 {
912 log.debug( "RemoveALL FROM cluster, NOT updating other auxiliaries for region" );
913 c.localRemoveAll();
914 }
915 else
916 {
917 log.debug( "RemoveALL NOT from cluster, updating other auxiliaries for region" );
918 c.removeAll();
919 }
920
921
922 if (!fromCluster || fromCluster && remoteCacheServerAttributes.isLocalClusterConsistency())
923 {
924 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
925
926 for (final ICacheEventQueue<K, V> q : qlist)
927 {
928 q.addRemoveAllEvent();
929 }
930 }
931 }
932 }
933 }
934
935
936
937
938
939
940
941 int getPutCount()
942 {
943 return puts;
944 }
945
946
947
948
949
950
951
952 @Override
953 public void dispose( final String cacheName )
954 throws IOException
955 {
956 dispose( cacheName, 0 );
957 }
958
959
960
961
962
963
964
965
966 public void dispose( final String cacheName, final long requesterId )
967 throws IOException
968 {
969 final ICacheEvent<String> cacheEvent = createICacheEvent( cacheName, "none", requesterId, ICacheEventLogger.DISPOSE_EVENT );
970 try
971 {
972 processDispose( cacheName, requesterId );
973 }
974 finally
975 {
976 logICacheEvent( cacheEvent );
977 }
978 }
979
980
981
982
983
984
985 private void processDispose( final String cacheName, final long requesterId )
986 throws IOException
987 {
988 log.info( "Dispose request received from listener [{0}]", requesterId );
989
990 final CacheListeners<K, V> cacheDesc = cacheListenersMap.get( cacheName );
991
992
993 if ( cacheDesc != null )
994 {
995
996 synchronized ( cacheDesc )
997 {
998 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, requesterId );
999
1000 for (final ICacheEventQueue<K, V> element : qlist) {
1001 element.addDisposeEvent();
1002 }
1003 cacheManager.freeCache( cacheName );
1004 }
1005 }
1006 }
1007
1008
1009
1010
1011
1012
1013 @Override
1014 public void release()
1015 throws IOException
1016 {
1017 for (final CacheListeners<K, V> cacheDesc : cacheListenersMap.values())
1018 {
1019 final ICacheEventQueue<K, V>[] qlist = getEventQList( cacheDesc, 0 );
1020
1021 for (final ICacheEventQueue<K, V> element : qlist) {
1022 element.addDisposeEvent();
1023 }
1024 }
1025 cacheManager.release();
1026 }
1027
1028
1029
1030
1031
1032
1033
1034
1035 protected CacheListeners<K, V> getCacheListeners( final String cacheName )
1036 {
1037
1038 return cacheListenersMap.computeIfAbsent(cacheName, key -> {
1039 final CompositeCache<K, V> cache = cacheManager.getCache(key);
1040 return new CacheListeners<>( cache );
1041 });
1042 }
1043
1044
1045
1046
1047
1048
1049
1050
1051 protected CacheListeners<K, V> getClusterListeners( final String cacheName )
1052 {
1053
1054 return clusterListenersMap.computeIfAbsent(cacheName, key -> {
1055 final CompositeCache<K, V> cache = cacheManager.getCache( cacheName );
1056 return new CacheListeners<>( cache );
1057 });
1058 }
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072 @SuppressWarnings("unchecked")
1073 private ICacheEventQueue<K, V>[] getEventQList( final CacheListeners<K, V> cacheListeners, final long requesterId )
1074 {
1075 final ICacheEventQueue<K, V>[] list = cacheListeners.eventQMap.values().toArray( new ICacheEventQueue[0] );
1076 int count = 0;
1077
1078 for ( int i = 0; i < list.length; i++ )
1079 {
1080 final ICacheEventQueue<K, V> q = list[i];
1081 if ( q.isWorking() && q.getListenerId() != requesterId )
1082 {
1083 count++;
1084 }
1085 else
1086 {
1087 list[i] = null;
1088 }
1089 }
1090 if ( count == list.length )
1091 {
1092
1093 return list;
1094 }
1095
1096
1097 final ICacheEventQueue<K, V>[] qq = new ICacheEventQueue[count];
1098 count = 0;
1099 for (final ICacheEventQueue<K, V> element : list) {
1100 if ( element != null )
1101 {
1102 qq[count++] = element;
1103 }
1104 }
1105 return qq;
1106 }
1107
1108
1109
1110
1111
1112
1113 private static <KK, VV> void cleanupEventQMap( final Map<Long, ICacheEventQueue<KK, VV>> eventQMap )
1114 {
1115
1116
1117
1118
1119 eventQMap.entrySet().removeIf(e -> !e.getValue().isWorking());
1120 }
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133 @Override
1134 @SuppressWarnings("unchecked")
1135 public <KK, VV> void addCacheListener( final String cacheName, final ICacheListener<KK, VV> listener )
1136 throws IOException
1137 {
1138 if ( cacheName == null || listener == null )
1139 {
1140 throw new IllegalArgumentException( "cacheName and listener must not be null" );
1141 }
1142 final CacheListeners<KK, VV> cacheListeners;
1143
1144 final IRemoteCacheListener<KK, VV> ircl = (IRemoteCacheListener<KK, VV>) listener;
1145
1146 final String listenerAddress = ircl.getLocalHostAddress();
1147
1148 final RemoteType remoteType = ircl.getRemoteType();
1149 if ( remoteType == RemoteType.CLUSTER )
1150 {
1151 log.debug( "adding cluster listener, listenerAddress [{0}]", listenerAddress );
1152 cacheListeners = (CacheListeners<KK, VV>)getClusterListeners( cacheName );
1153 }
1154 else
1155 {
1156 log.debug( "adding normal listener, listenerAddress [{0}]", listenerAddress );
1157 cacheListeners = (CacheListeners<KK, VV>)getCacheListeners( cacheName );
1158 }
1159 final Map<Long, ICacheEventQueue<KK, VV>> eventQMap = cacheListeners.eventQMap;
1160 cleanupEventQMap( eventQMap );
1161
1162
1163 synchronized ( ICacheListener.class )
1164 {
1165 long id = 0;
1166 try
1167 {
1168 id = listener.getListenerId();
1169
1170 if ( id == 0 )
1171 {
1172
1173 final long listenerIdB = nextListenerId();
1174 log.debug( "listener id={0} addded for cache [{1}], listenerAddress [{2}]",
1175 listenerIdB & 0xff, cacheName, listenerAddress );
1176 listener.setListenerId( listenerIdB );
1177 id = listenerIdB;
1178
1179
1180 final String message = "Adding vm listener under new id = [" + listenerIdB + "], listenerAddress ["
1181 + listenerAddress + "]";
1182 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1183 log.info( message );
1184 }
1185 else
1186 {
1187 final String message = "Adding listener under existing id = [" + id + "], listenerAddress ["
1188 + listenerAddress + "]";
1189 logApplicationEvent( "RemoteCacheServer", "addCacheListener", message );
1190 log.info( message );
1191
1192
1193 }
1194
1195
1196 this.idTypeMap.put( Long.valueOf( id ), remoteType);
1197 if ( listenerAddress != null )
1198 {
1199 this.idIPMap.put( Long.valueOf( id ), listenerAddress );
1200 }
1201 }
1202 catch ( final IOException ioe )
1203 {
1204 final String message = "Problem setting listener id, listenerAddress [" + listenerAddress + "]";
1205 log.error( message, ioe );
1206
1207 if ( cacheEventLogger != null )
1208 {
1209 cacheEventLogger.logError( "RemoteCacheServer", "addCacheListener", message + " - "
1210 + ioe.getMessage() );
1211 }
1212 }
1213
1214 final CacheEventQueueFactory<KK, VV> fact = new CacheEventQueueFactory<>();
1215 final ICacheEventQueue<KK, VV> q = fact.createCacheEventQueue( listener, id, cacheName, remoteCacheServerAttributes
1216 .getEventQueuePoolName(), remoteCacheServerAttributes.getEventQueueType() );
1217
1218 eventQMap.put(Long.valueOf(listener.getListenerId()), q);
1219
1220 log.info( cacheListeners );
1221 }
1222 }
1223
1224
1225
1226
1227
1228
1229
1230 @Override
1231 public <KK, VV> void addCacheListener( final ICacheListener<KK, VV> listener )
1232 throws IOException
1233 {
1234 for (final String cacheName : cacheListenersMap.keySet())
1235 {
1236 addCacheListener( cacheName, listener );
1237
1238 log.debug( "Adding listener for cache [{0}]", cacheName );
1239 }
1240 }
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250 @Override
1251 public <KK, VV> void removeCacheListener( final String cacheName, final ICacheListener<KK, VV> listener )
1252 throws IOException
1253 {
1254 removeCacheListener( cacheName, listener.getListenerId() );
1255 }
1256
1257
1258
1259
1260
1261
1262
1263
1264 public void removeCacheListener( final String cacheName, final long listenerId )
1265 {
1266 final String message = "Removing listener for cache region = [" + cacheName + "] and listenerId [" + listenerId + "]";
1267 logApplicationEvent( "RemoteCacheServer", "removeCacheListener", message );
1268 log.info( message );
1269
1270 final boolean isClusterListener = isRequestFromCluster( listenerId );
1271
1272 CacheListeners<K, V> cacheDesc = null;
1273
1274 if ( isClusterListener )
1275 {
1276 cacheDesc = getClusterListeners( cacheName );
1277 }
1278 else
1279 {
1280 cacheDesc = getCacheListeners( cacheName );
1281 }
1282 final Map<Long, ICacheEventQueue<K, V>> eventQMap = cacheDesc.eventQMap;
1283 cleanupEventQMap( eventQMap );
1284 final ICacheEventQueue<K, V> q = eventQMap.remove( Long.valueOf( listenerId ) );
1285
1286 if ( q != null )
1287 {
1288 log.debug( "Found queue for cache region = [{0}] and listenerId [{1}]",
1289 cacheName, listenerId );
1290 q.destroy();
1291 cleanupEventQMap( eventQMap );
1292 }
1293 else
1294 {
1295 log.debug( "Did not find queue for cache region = [{0}] and listenerId [{1}]",
1296 cacheName, listenerId );
1297 }
1298
1299
1300 idTypeMap.remove( Long.valueOf( listenerId ) );
1301 idIPMap.remove( Long.valueOf( listenerId ) );
1302
1303 log.info( "After removing listener [{0}] cache region {1} listener size [{2}]",
1304 listenerId, cacheName, eventQMap.size() );
1305 }
1306
1307
1308
1309
1310
1311
1312
1313 @Override
1314 public <KK, VV> void removeCacheListener( final ICacheListener<KK, VV> listener )
1315 throws IOException
1316 {
1317 for (final String cacheName : cacheListenersMap.keySet())
1318 {
1319 removeCacheListener( cacheName, listener );
1320
1321 log.info( "Removing listener for cache [{0}]", cacheName );
1322 }
1323 }
1324
1325
1326
1327
1328
1329
1330 @Override
1331 public void shutdown()
1332 throws IOException
1333 {
1334 shutdown("", Registry.REGISTRY_PORT);
1335 }
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345 @Override
1346 public void shutdown( final String host, final int port )
1347 throws IOException
1348 {
1349 log.info( "Received shutdown request. Shutting down server." );
1350
1351 synchronized (listenerId)
1352 {
1353 for (final String cacheName : cacheListenersMap.keySet())
1354 {
1355 for (int i = 0; i <= listenerId[0]; i++)
1356 {
1357 removeCacheListener( cacheName, i );
1358 }
1359
1360 log.info( "Removing listener for cache [{0}]", cacheName );
1361 }
1362
1363 cacheListenersMap.clear();
1364 clusterListenersMap.clear();
1365 }
1366 RemoteCacheServerFactory.shutdownImpl( host, port );
1367 this.cacheManager.shutDown();
1368 }
1369
1370
1371
1372
1373
1374
1375 @Override
1376 public void unreferenced()
1377 {
1378 log.info( "*** Server now unreferenced and subject to GC. ***" );
1379 }
1380
1381
1382
1383
1384
1385
1386 private long nextListenerId()
1387 {
1388 long id = 0;
1389 if ( listenerId[0] == Integer.MAX_VALUE )
1390 {
1391 synchronized ( listenerId )
1392 {
1393 id = listenerId[0];
1394 listenerId[0] = 0;
1395
1396
1397
1398
1399
1400 }
1401 }
1402 else
1403 {
1404 synchronized ( listenerId )
1405 {
1406 id = ++listenerId[0];
1407 }
1408 }
1409 return id;
1410 }
1411
1412
1413
1414
1415
1416
1417
1418 @Override
1419 public String getStats()
1420 throws IOException
1421 {
1422 return cacheManager.getStats();
1423 }
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433 private ICacheEvent<ICacheElement<K, V>> createICacheEvent( final ICacheElement<K, V> item, final long requesterId, final String eventName )
1434 {
1435 if ( cacheEventLogger == null )
1436 {
1437 return new CacheEvent<>();
1438 }
1439 final String ipAddress = getExtraInfoForRequesterId( requesterId );
1440 return cacheEventLogger
1441 .createICacheEvent( "RemoteCacheServer", item.getCacheName(), eventName, ipAddress, item );
1442 }
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453 private <T> ICacheEvent<T> createICacheEvent( final String cacheName, final T key, final long requesterId, final String eventName )
1454 {
1455 if ( cacheEventLogger == null )
1456 {
1457 return new CacheEvent<>();
1458 }
1459 final String ipAddress = getExtraInfoForRequesterId( requesterId );
1460 return cacheEventLogger.createICacheEvent( "RemoteCacheServer", cacheName, eventName, ipAddress, key );
1461 }
1462
1463
1464
1465
1466
1467
1468
1469
1470 protected void logApplicationEvent( final String source, final String eventName, final String optionalDetails )
1471 {
1472 if ( cacheEventLogger != null )
1473 {
1474 cacheEventLogger.logApplicationEvent( source, eventName, optionalDetails );
1475 }
1476 }
1477
1478
1479
1480
1481
1482
1483 protected <T> void logICacheEvent( final ICacheEvent<T> cacheEvent )
1484 {
1485 if ( cacheEventLogger != null )
1486 {
1487 cacheEventLogger.logICacheEvent( cacheEvent );
1488 }
1489 }
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499 protected String getExtraInfoForRequesterId( final long requesterId )
1500 {
1501 return idIPMap.get( Long.valueOf( requesterId ) );
1502 }
1503
1504
1505
1506
1507
1508
1509 public void setCacheEventLogger( final ICacheEventLogger cacheEventLogger )
1510 {
1511 this.cacheEventLogger = cacheEventLogger;
1512 }
1513 }