1 package org.apache.commons.jcs3.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheEventLogging;
35 import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
36 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes;
37 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
38 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
39 import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType;
40 import org.apache.commons.jcs3.engine.CacheStatus;
41 import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
42 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
43 import org.apache.commons.jcs3.engine.behavior.ICacheElementSerialized;
44 import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
45 import org.apache.commons.jcs3.engine.behavior.IZombie;
46 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
47 import org.apache.commons.jcs3.engine.stats.StatElement;
48 import org.apache.commons.jcs3.engine.stats.Stats;
49 import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
50 import org.apache.commons.jcs3.engine.stats.behavior.IStats;
51 import org.apache.commons.jcs3.log.Log;
52 import org.apache.commons.jcs3.log.LogManager;
53 import org.apache.commons.jcs3.utils.serialization.SerializationConversionUtil;
54 import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
55
56
57 public abstract class AbstractRemoteAuxiliaryCache<K, V>
58 extends AbstractAuxiliaryCacheEventLogging<K, V>
59 implements IRemoteCacheClient<K, V>
60 {
61
62 private static final Log log = LogManager.getLog( AbstractRemoteAuxiliaryCache.class );
63
64
65
66
67
68 private ICacheServiceNonLocal<K, V> remoteCacheService;
69
70
71 protected final String cacheName;
72
73
74 private IRemoteCacheListener<K, V> remoteCacheListener;
75
76
77 private IRemoteCacheAttributes remoteCacheAttributes;
78
79
80 private ExecutorService pool;
81
82
83 private boolean usePoolForGet;
84
85
86
87
88
89
90
91
92 public AbstractRemoteAuxiliaryCache( final IRemoteCacheAttributes cattr, final ICacheServiceNonLocal<K, V> remote,
93 final IRemoteCacheListener<K, V> listener )
94 {
95 this.setRemoteCacheAttributes( cattr );
96 this.cacheName = cattr.getCacheName();
97 this.setRemoteCacheService( remote );
98 this.setRemoteCacheListener( listener );
99
100 if ( log.isDebugEnabled() )
101 {
102 log.debug( "Construct> cacheName={0}", cattr::getCacheName);
103 log.debug( "irca = {0}", this::getRemoteCacheAttributes);
104 log.debug( "remote = {0}", remote );
105 log.debug( "listener = {0}", listener );
106 }
107
108
109 log.debug( "GetTimeoutMillis() = {0}",
110 () -> getRemoteCacheAttributes().getGetTimeoutMillis() );
111
112 if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
113 {
114 pool = ThreadPoolManager.getInstance().getExecutorService( getRemoteCacheAttributes().getThreadPoolName() );
115 log.debug( "Thread Pool = {0}", pool );
116 usePoolForGet = true;
117 }
118 }
119
120
121
122
123
124
125 @Override
126 protected void processDispose()
127 throws IOException
128 {
129 log.info( "Disposing of remote cache." );
130 try
131 {
132 if ( getRemoteCacheListener() != null )
133 {
134 getRemoteCacheListener().dispose();
135 }
136 }
137 catch ( final IOException ex )
138 {
139 log.error( "Couldn't dispose", ex );
140 handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
141 }
142 }
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 @Override
158 protected ICacheElement<K, V> processGet( final K key )
159 throws IOException
160 {
161 ICacheElement<K, V> retVal = null;
162 try
163 {
164 if ( usePoolForGet )
165 {
166 retVal = getUsingPool( key );
167 }
168 else
169 {
170 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
171 }
172
173
174
175
176
177 if (retVal instanceof ICacheElementSerialized && this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER)
178 {
179 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
180 super.getElementSerializer() );
181 }
182 }
183 catch ( final IOException | ClassNotFoundException ex )
184 {
185 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
186 }
187 return retVal;
188 }
189
190
191
192
193
194
195
196
197 public ICacheElement<K, V> getUsingPool( final K key )
198 throws IOException
199 {
200 final int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
201
202 try
203 {
204 final Callable<ICacheElement<K, V>> command = () -> getRemoteCacheService().get( cacheName, key, getListenerId() );
205
206
207 final Future<ICacheElement<K, V>> future = pool.submit(command);
208
209
210 final ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
211
212 if ( ice == null )
213 {
214 log.debug( "nothing found in remote cache" );
215 }
216 else
217 {
218 log.debug( "found item in remote cache" );
219 }
220 return ice;
221 }
222 catch ( final TimeoutException te )
223 {
224 log.warn( "TimeoutException, Get Request timed out after {0}", timeout );
225 throw new IOException( "Get Request timed out after " + timeout );
226 }
227 catch ( final InterruptedException ex )
228 {
229 log.warn( "InterruptedException, Get Request timed out after {0}", timeout );
230 throw new IOException( "Get Request timed out after " + timeout );
231 }
232 catch (final ExecutionException ex)
233 {
234
235 log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
236 throw new IOException( "Get Request timed out after " + timeout );
237 }
238 }
239
240
241
242
243
244
245
246
247 @Override
248 public Map<K, ICacheElement<K, V>> processGetMatching( final String pattern )
249 throws IOException
250 {
251 final Map<K, ICacheElement<K, V>> results = new HashMap<>();
252 try
253 {
254 final Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
255
256
257 if ( rawResults != null )
258 {
259 for (final Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
260 {
261 ICacheElement<K, V> unwrappedResult = null;
262 if ( entry.getValue() instanceof ICacheElementSerialized )
263 {
264
265
266
267 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
268 {
269 unwrappedResult = SerializationConversionUtil
270 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
271 super.getElementSerializer() );
272 }
273 }
274 else
275 {
276 unwrappedResult = entry.getValue();
277 }
278 results.put( entry.getKey(), unwrappedResult );
279 }
280 }
281 }
282 catch ( final IOException | ClassNotFoundException ex )
283 {
284 handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
285 ICacheEventLogger.GET_EVENT );
286 }
287 return results;
288 }
289
290
291
292
293
294
295
296
297
298 @Override
299 protected boolean processRemove( final K key )
300 throws IOException
301 {
302 if ( !this.getRemoteCacheAttributes().getGetOnly() )
303 {
304 log.debug( "remove> key={0}", key );
305 try
306 {
307 getRemoteCacheService().remove( cacheName, key, getListenerId() );
308 }
309 catch ( final IOException ex )
310 {
311 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
312 }
313 return true;
314 }
315 return false;
316 }
317
318
319
320
321
322
323
324 @Override
325 protected void processRemoveAll()
326 throws IOException
327 {
328 if ( !this.getRemoteCacheAttributes().getGetOnly() )
329 {
330 try
331 {
332 getRemoteCacheService().removeAll( cacheName, getListenerId() );
333 }
334 catch ( final IOException ex )
335 {
336 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
337 }
338 }
339 }
340
341
342
343
344
345
346
347
348
349 @Override
350 protected void processUpdate( final ICacheElement<K, V> ce )
351 throws IOException
352 {
353 if ( !getRemoteCacheAttributes().getGetOnly() )
354 {
355 try
356 {
357 log.debug( "sending item to remote server" );
358
359
360
361 ICacheElementSerialized<K, V> serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() );
362
363 remoteCacheService.update( serialized, getListenerId() );
364 }
365 catch ( final IOException ex )
366 {
367
368 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
369 ICacheEventLogger.UPDATE_EVENT );
370 }
371 }
372 else
373 {
374 log.debug( "get only mode, not sending to remote server" );
375 }
376 }
377
378
379
380
381
382
383 @Override
384 public Set<K> getKeySet()
385 throws IOException
386 {
387 return getRemoteCacheService().getKeySet(cacheName);
388 }
389
390
391
392
393
394
395
396 @Override
397 public IRemoteCacheListener<K, V> getListener()
398 {
399 return getRemoteCacheListener();
400 }
401
402
403
404
405
406
407
408
409 public void setListenerId( final long id )
410 {
411 if ( getRemoteCacheListener() != null )
412 {
413 try
414 {
415 getRemoteCacheListener().setListenerId( id );
416
417 log.debug( "set listenerId = {0}", id );
418 }
419 catch ( final IOException e )
420 {
421 log.error( "Problem setting listenerId", e );
422 }
423 }
424 }
425
426
427
428
429
430
431 @Override
432 public long getListenerId()
433 {
434 if ( getRemoteCacheListener() != null )
435 {
436 try
437 {
438 log.debug( "get listenerId = {0}", getRemoteCacheListener().getListenerId() );
439 return getRemoteCacheListener().getListenerId();
440 }
441 catch ( final IOException e )
442 {
443 log.error( "Problem getting listenerId", e );
444 }
445 }
446 return -1;
447 }
448
449
450
451
452
453 @Override
454 public int getSize()
455 {
456 return 0;
457 }
458
459
460
461
462
463
464
465
466
467 protected abstract void handleException( Exception ex, String msg, String eventName )
468 throws IOException;
469
470
471
472
473
474
475 @Override
476 public String getStats()
477 {
478 return getStatistics().toString();
479 }
480
481
482
483
484 @Override
485 public IStats getStatistics()
486 {
487 final IStats stats = new Stats();
488 stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
489
490 final ArrayList<IStatElement<?>> elems = new ArrayList<>();
491
492 elems.add(new StatElement<>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) );
493
494
495
496
497
498
499 elems.add(new StatElement<>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) );
500
501 if ( pool != null )
502 {
503 elems.add(new StatElement<>( "Pool", pool ) );
504 }
505
506 if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
507 {
508 elems.add(new StatElement<>( "Zombie Queue Size",
509 Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) );
510 }
511
512 stats.setStatElements( elems );
513
514 return stats;
515 }
516
517
518
519
520
521
522 @Override
523 public CacheStatus getStatus()
524 {
525 return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
526 }
527
528
529
530
531
532
533
534 @Override
535 public void fixCache( final ICacheServiceNonLocal<?, ?> restoredRemote )
536 {
537 @SuppressWarnings("unchecked")
538 final
539 ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
540 final ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService();
541 if ( prevRemote instanceof ZombieCacheServiceNonLocal )
542 {
543 final ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote;
544 setRemoteCacheService( remote );
545 try
546 {
547 zombie.propagateEvents( remote );
548 }
549 catch ( final Exception e )
550 {
551 try
552 {
553 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
554 "fixCache" );
555 }
556 catch ( final IOException e1 )
557 {
558
559 }
560 }
561 }
562 else
563 {
564 setRemoteCacheService( remote );
565 }
566 }
567
568
569
570
571
572
573 @Override
574 public CacheType getCacheType()
575 {
576 return CacheType.REMOTE_CACHE;
577 }
578
579
580
581
582
583
584 @Override
585 public String getCacheName()
586 {
587 return cacheName;
588 }
589
590
591
592
593 protected void setRemoteCacheService( final ICacheServiceNonLocal<K, V> remote )
594 {
595 this.remoteCacheService = remote;
596 }
597
598
599
600
601 protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
602 {
603 return remoteCacheService;
604 }
605
606
607
608
609 @Override
610 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
611 {
612 return getRemoteCacheAttributes();
613 }
614
615
616
617
618 protected void setRemoteCacheAttributes( final IRemoteCacheAttributes remoteCacheAttributes )
619 {
620 this.remoteCacheAttributes = remoteCacheAttributes;
621 }
622
623
624
625
626 protected IRemoteCacheAttributes getRemoteCacheAttributes()
627 {
628 return remoteCacheAttributes;
629 }
630
631
632
633
634 protected void setRemoteCacheListener( final IRemoteCacheListener<K, V> remoteCacheListener )
635 {
636 this.remoteCacheListener = remoteCacheListener;
637 }
638
639
640
641
642 protected IRemoteCacheListener<K, V> getRemoteCacheListener()
643 {
644 return remoteCacheListener;
645 }
646 }