1 package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheFactory;
27 import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
28 import org.apache.commons.jcs3.auxiliary.lateral.LateralCache;
29 import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheMonitor;
30 import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWait;
31 import org.apache.commons.jcs3.auxiliary.lateral.LateralCacheNoWaitFacade;
32 import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
33 import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34 import org.apache.commons.jcs3.engine.CacheWatchRepairable;
35 import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
36 import org.apache.commons.jcs3.engine.ZombieCacheWatch;
37 import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
38 import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
39 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
40 import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
41 import org.apache.commons.jcs3.engine.control.CompositeCacheManager;
42 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
43 import org.apache.commons.jcs3.log.Log;
44 import org.apache.commons.jcs3.log.LogManager;
45 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryManager;
46 import org.apache.commons.jcs3.utils.discovery.UDPDiscoveryService;
47 import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
48
49
50
51
52
53
54
55
56 public class LateralTCPCacheFactory
57 extends AbstractAuxiliaryCacheFactory
58 {
59
60 private static final Log log = LogManager.getLog( LateralTCPCacheFactory.class );
61
62
63 private ConcurrentHashMap<String, ICacheServiceNonLocal<?, ?>> csnlInstances;
64
65
66 private ConcurrentHashMap<String, LateralTCPDiscoveryListener> lTCPDLInstances;
67
68
69 private LateralCacheMonitor monitor;
70
71
72
73
74
75 private CacheWatchRepairable lateralWatch;
76
77
78
79
80
81
82
83
84
85
86
87
88 @Override
89 public <K, V> LateralCacheNoWaitFacade<K, V> createCache(
90 final AuxiliaryCacheAttributes iaca, final ICompositeCacheManager cacheMgr,
91 final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
92 {
93 final ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
94 final ArrayList<LateralCacheNoWait<K, V>> noWaits = new ArrayList<>();
95
96
97
98
99 if (lac.getTcpServers() != null && !lac.getTcpServers().isEmpty())
100 {
101 final String servers[] = lac.getTcpServers().split("\\s*,\\s*");
102 log.debug( "Configured for [{0}] servers.", servers.length );
103
104 for (final String server : servers)
105 {
106 log.debug( "tcp server = {0}", server );
107 final ITCPLateralCacheAttributes lacClone = (ITCPLateralCacheAttributes) lac.clone();
108 lacClone.setTcpServer( server );
109
110 final LateralCacheNoWait<K, V> lateralNoWait = createCacheNoWait(lacClone, cacheEventLogger, elementSerializer);
111
112 addListenerIfNeeded( lacClone, cacheMgr, elementSerializer );
113 monitorCache(lateralNoWait);
114 noWaits.add( lateralNoWait );
115 }
116 }
117
118 final ILateralCacheListener<K, V> listener = createListener( lac, cacheMgr, elementSerializer );
119
120
121 final LateralCacheNoWaitFacade<K, V> lcnwf =
122 new LateralCacheNoWaitFacade<>(listener, noWaits, lac);
123
124
125 createDiscoveryService( lac, lcnwf, cacheMgr, cacheEventLogger, elementSerializer );
126
127 return lcnwf;
128 }
129
130
131
132
133
134
135
136
137
138
139
140 public <K, V> LateralCacheNoWait<K, V> createCacheNoWait( final ITCPLateralCacheAttributes lca,
141 final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer )
142 {
143 final ICacheServiceNonLocal<K, V> lateralService = getCSNLInstance(lca, elementSerializer);
144
145 final LateralCache<K, V> cache = new LateralCache<>( lca, lateralService, this.monitor );
146 cache.setCacheEventLogger( cacheEventLogger );
147 cache.setElementSerializer( elementSerializer );
148
149 log.debug( "Created cache for noWait, cache [{0}]", cache );
150
151 final LateralCacheNoWait<K, V> lateralNoWait = new LateralCacheNoWait<>( cache );
152 lateralNoWait.setIdentityKey(lca.getTcpServer());
153
154 log.info( "Created LateralCacheNoWait for [{0}] LateralCacheNoWait = [{1}]",
155 lca, lateralNoWait );
156
157 return lateralNoWait;
158 }
159
160
161
162
163 @Override
164 public void initialize()
165 {
166 this.csnlInstances = new ConcurrentHashMap<>();
167 this.lTCPDLInstances = new ConcurrentHashMap<>();
168
169
170 this.monitor = new LateralCacheMonitor(this);
171 this.monitor.setDaemon( true );
172 this.monitor.start();
173
174 this.lateralWatch = new CacheWatchRepairable();
175 this.lateralWatch.setCacheWatch( new ZombieCacheWatch() );
176 }
177
178
179
180
181 @Override
182 public void dispose()
183 {
184 for (final ICacheServiceNonLocal<?, ?> service : this.csnlInstances.values())
185 {
186 try
187 {
188 service.dispose("");
189 }
190 catch (final IOException e)
191 {
192 log.error("Could not dispose service " + service, e);
193 }
194 }
195
196 this.csnlInstances.clear();
197
198
199 this.lTCPDLInstances.clear();
200
201 if (this.monitor != null)
202 {
203 this.monitor.notifyShutdown();
204 try
205 {
206 this.monitor.join(5000);
207 }
208 catch (final InterruptedException e)
209 {
210
211 }
212 this.monitor = null;
213 }
214 }
215
216
217
218
219
220
221
222
223
224
225
226
227 @Deprecated
228 public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance( final ITCPLateralCacheAttributes lca )
229 {
230 return getCSNLInstance(lca, new StandardSerializer());
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244
245 @SuppressWarnings("unchecked")
246 public <K, V> ICacheServiceNonLocal<K, V> getCSNLInstance(final ITCPLateralCacheAttributes lca,
247 final IElementSerializer elementSerializer)
248 {
249 final String key = lca.getTcpServer();
250
251 return (ICacheServiceNonLocal<K, V>) csnlInstances.compute(key, (name, service) -> {
252
253 ICacheServiceNonLocal<?, ?> newService = service;
254
255
256 if (service instanceof ZombieCacheServiceNonLocal)
257 {
258 log.info("Disposing of zombie service instance for [{0}]", name);
259 newService = null;
260 }
261
262 if (newService == null)
263 {
264 log.info( "Instance for [{0}] is null, creating", name );
265
266
267 try
268 {
269 log.info( "Creating TCP service, lca = {0}", lca );
270
271 newService = new LateralTCPService<>(lca, elementSerializer);
272 }
273 catch ( final IOException ex )
274 {
275
276
277
278 log.error( "Failure, lateral instance will use zombie service", ex );
279
280 newService = new ZombieCacheServiceNonLocal<>(lca.getZombieQueueMaxSize());
281
282
283
284 monitor.notifyError();
285 }
286 }
287
288 return newService;
289 });
290 }
291
292
293
294
295
296
297
298 public void monitorCache(final LateralCacheNoWait<?, ?> cache)
299 {
300 monitor.addCache(cache);
301 }
302
303
304
305
306
307
308
309
310
311
312
313 private LateralTCPDiscoveryListener getDiscoveryListener(final ITCPLateralCacheAttributes ilca,
314 final ICompositeCacheManager cacheManager, final ICacheEventLogger cacheEventLogger,
315 final IElementSerializer elementSerializer)
316 {
317 final String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
318
319 return lTCPDLInstances.computeIfAbsent(key, key1 -> {
320 log.info("Created new discovery listener for cacheName {0} and request {1}",
321 ilca.getCacheName(), key1);
322 return new LateralTCPDiscoveryListener( this.getName(),
323 (CompositeCacheManager) cacheManager,
324 cacheEventLogger, elementSerializer);
325 });
326 }
327
328
329
330
331
332
333
334
335 private void addListenerIfNeeded( final ITCPLateralCacheAttributes iaca, final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer )
336 {
337
338 if ( iaca.isReceive() )
339 {
340 try
341 {
342 addLateralCacheListener(iaca.getCacheName(), createListener(iaca, cacheMgr, elementSerializer));
343 }
344 catch ( final IOException ioe )
345 {
346 log.error("Problem creating lateral listener", ioe);
347 }
348 }
349 else
350 {
351 log.debug( "Not creating a listener since we are not receiving." );
352 }
353 }
354
355
356
357
358
359
360
361
362 private <K, V> void addLateralCacheListener( final String cacheName, final ILateralCacheListener<K, V> listener )
363 throws IOException
364 {
365 synchronized ( this.lateralWatch )
366 {
367 lateralWatch.addCacheListener( cacheName, listener );
368 }
369 }
370
371
372
373
374
375
376
377
378
379
380
381
382
383 private static <K, V> ILateralCacheListener<K, V> createListener( final ITCPLateralCacheAttributes attr,
384 final ICompositeCacheManager cacheMgr, final IElementSerializer elementSerializer )
385 {
386 ILateralCacheListener<K, V> listener = null;
387
388
389 if ( attr.isReceive() )
390 {
391 log.info( "Getting listener for {0}", attr );
392
393
394 listener = LateralTCPListener.getInstance( attr, cacheMgr, elementSerializer );
395
396
397 cacheMgr.registerShutdownObserver( (IShutdownObserver) listener );
398 }
399 else
400 {
401 log.debug( "Not creating a listener since we are not receiving." );
402 }
403
404 return listener;
405 }
406
407
408
409
410
411
412
413
414
415
416 private synchronized <K, V> void createDiscoveryService(
417 final ITCPLateralCacheAttributes lac,
418 final LateralCacheNoWaitFacade<K, V> lcnwf,
419 final ICompositeCacheManager cacheMgr,
420 final ICacheEventLogger cacheEventLogger,
421 final IElementSerializer elementSerializer )
422 {
423 UDPDiscoveryService discovery = null;
424
425
426 if ( lac.isUdpDiscoveryEnabled() )
427 {
428
429 final LateralTCPDiscoveryListener discoveryListener =
430 getDiscoveryListener(lac, cacheMgr, cacheEventLogger, elementSerializer);
431 discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
432
433
434
435 discovery = UDPDiscoveryManager.getInstance().getService(
436 lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(),
437 lac.getTcpListenerHost(), lac.getTcpListenerPort(), lac.getUdpTTL(),
438 cacheMgr, elementSerializer);
439
440 discovery.addParticipatingCacheName( lac.getCacheName() );
441 discovery.addDiscoveryListener( discoveryListener );
442
443 log.info( "Registered TCP lateral cache [{0}] with UDPDiscoveryService.",
444 lac::getCacheName);
445 }
446 }
447 }