1 package org.apache.commons.jcs3.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.rmi.Naming;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26
27 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes;
28 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
29 import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
30 import org.apache.commons.jcs3.engine.CacheStatus;
31 import org.apache.commons.jcs3.engine.CacheWatchRepairable;
32 import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
33 import org.apache.commons.jcs3.engine.ZombieCacheWatch;
34 import org.apache.commons.jcs3.engine.behavior.ICacheObserver;
35 import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
36 import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
37 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
38 import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
39 import org.apache.commons.jcs3.log.Log;
40 import org.apache.commons.jcs3.log.LogManager;
41
42
43
44
45
46
47
48
49
50 public class RemoteCacheManager
51 {
52
53 private static final Log log = LogManager.getLog( RemoteCacheManager.class );
54
55
56 private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches =
57 new ConcurrentHashMap<>();
58
59
60 private final ICacheEventLogger cacheEventLogger;
61
62
63 private final IElementSerializer elementSerializer;
64
65
66 private ICacheServiceNonLocal<?, ?> remoteService;
67
68
69
70
71
72 private final CacheWatchRepairable remoteWatch;
73
74
75 private final ICompositeCacheManager cacheMgr;
76
77
78 private final RemoteCacheMonitor monitor;
79
80
81 private final String registry;
82
83
84 private boolean canFix = true;
85
86
87
88
89
90
91
92
93
94
95
96
97 protected RemoteCacheManager( final IRemoteCacheAttributes cattr, final ICompositeCacheManager cacheMgr,
98 final RemoteCacheMonitor monitor,
99 final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer)
100 {
101 this.cacheMgr = cacheMgr;
102 this.monitor = monitor;
103 this.cacheEventLogger = cacheEventLogger;
104 this.elementSerializer = elementSerializer;
105 this.remoteWatch = new CacheWatchRepairable();
106
107 this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName());
108
109 try
110 {
111 lookupRemoteService();
112 }
113 catch (final IOException e)
114 {
115 log.error("Could not find server", e);
116
117
118 monitor.notifyError();
119 }
120 }
121
122
123
124
125
126
127 protected void lookupRemoteService() throws IOException
128 {
129 log.info( "Looking up server [{0}]", registry );
130 try
131 {
132 final Object obj = Naming.lookup( registry );
133 log.info( "Server found: {0}", obj );
134
135
136 this.remoteService = (ICacheServiceNonLocal<?, ?>) obj;
137 log.debug( "Remote Service = {0}", remoteService );
138 remoteWatch.setCacheWatch( (ICacheObserver) remoteService );
139 }
140 catch ( final Exception ex )
141 {
142
143
144
145 this.remoteService = new ZombieCacheServiceNonLocal<>();
146 remoteWatch.setCacheWatch( new ZombieCacheWatch() );
147 throw new IOException( "Problem finding server at [" + registry + "]", ex );
148 }
149 }
150
151
152
153
154
155
156
157
158 public <K, V> void addRemoteCacheListener( final IRemoteCacheAttributes cattr, final IRemoteCacheListener<K, V> listener )
159 throws IOException
160 {
161 if ( cattr.isReceive() )
162 {
163 log.info( "The remote cache is configured to receive events from the remote server. "
164 + "We will register a listener. remoteWatch = {0} | IRemoteCacheListener = {1}"
165 + " | cacheName ", remoteWatch, listener, cattr.getCacheName() );
166
167 remoteWatch.addCacheListener( cattr.getCacheName(), listener );
168 }
169 else
170 {
171 log.info( "The remote cache is configured to NOT receive events from the remote server. "
172 + "We will NOT register a listener." );
173 }
174 }
175
176
177
178
179
180
181
182
183
184
185 public void removeRemoteCacheListener( final IRemoteCacheAttributes cattr )
186 throws IOException
187 {
188 final RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() );
189 if ( cache != null )
190 {
191 removeListenerFromCache(cache);
192 }
193 else
194 {
195 if ( cattr.isReceive() )
196 {
197 log.warn( "Trying to deregister Cache Listener that was never registered." );
198 }
199 else
200 {
201 log.debug( "Since the remote cache is configured to not receive, "
202 + "there is no listener to deregister." );
203 }
204 }
205 }
206
207
208 private void removeListenerFromCache(final RemoteCacheNoWait<?, ?> cache) throws IOException
209 {
210 final IRemoteCacheClient<?, ?> rc = cache.getRemoteCache();
211 log.debug( "Found cache for [{0}], deregistering listener.", cache::getCacheName);
212
213 remoteWatch.removeCacheListener(cache.getCacheName(), rc.getListener());
214 }
215
216
217
218
219
220
221
222
223
224
225
226 @SuppressWarnings("unchecked")
227 public <K, V> RemoteCacheNoWait<K, V> getCache( final IRemoteCacheAttributes cattr )
228 {
229
230 return (RemoteCacheNoWait<K, V>) caches.computeIfAbsent(cattr.getCacheName(),
231 key -> newRemoteCacheNoWait(cattr));
232 }
233
234
235
236
237
238
239
240 protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(final IRemoteCacheAttributes cattr)
241 {
242 final RemoteCacheNoWait<K, V> remoteCacheNoWait;
243
244
245 RemoteCacheListener<K, V> listener = null;
246 try
247 {
248 listener = new RemoteCacheListener<>( cattr, cacheMgr, elementSerializer );
249 addRemoteCacheListener( cattr, listener );
250 }
251 catch ( final IOException e )
252 {
253 log.error( "Problem adding listener. RemoteCacheListener = {0}",
254 listener, e );
255 }
256
257 @SuppressWarnings("unchecked")
258 final IRemoteCacheClient<K, V> remoteCacheClient =
259 new RemoteCache<>(cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor);
260 remoteCacheClient.setCacheEventLogger( cacheEventLogger );
261 remoteCacheClient.setElementSerializer( elementSerializer );
262
263 remoteCacheNoWait = new RemoteCacheNoWait<>( remoteCacheClient );
264 remoteCacheNoWait.setCacheEventLogger( cacheEventLogger );
265 remoteCacheNoWait.setElementSerializer( elementSerializer );
266
267 return remoteCacheNoWait;
268 }
269
270
271 public void release()
272 {
273 caches.forEach((name, cache) -> {
274 try
275 {
276 log.info("freeCache [{0}]", name);
277
278 removeListenerFromCache(cache);
279 cache.dispose();
280 }
281 catch ( final IOException ex )
282 {
283 log.error("Problem releasing {0}", name, ex);
284 }
285 });
286 caches.clear();
287 }
288
289
290
291
292 public void fixCaches()
293 {
294 if ( !canFix )
295 {
296 return;
297 }
298
299 log.info( "Fixing caches. ICacheServiceNonLocal {0} | IRemoteCacheObserver {1}",
300 remoteService, remoteWatch );
301
302 caches.values().stream()
303 .filter(cache -> cache.getStatus() == CacheStatus.ERROR)
304 .forEach(cache -> cache.fixCache(remoteService));
305
306 if ( log.isInfoEnabled() )
307 {
308 final String msg = "Remote connection to " + registry + " resumed.";
309 if ( cacheEventLogger != null )
310 {
311 cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg );
312 }
313 log.info( msg );
314 }
315 }
316
317
318
319
320
321
322
323 public boolean canFixCaches()
324 {
325 try
326 {
327 lookupRemoteService();
328 }
329 catch (final IOException e)
330 {
331 log.error("Could not find server", e);
332 canFix = false;
333 }
334
335 return canFix;
336 }
337 }