1 package org.apache.commons.jcs3.engine;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29 import org.apache.commons.jcs3.engine.behavior.ICacheElement;
30 import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
31 import org.apache.commons.jcs3.log.Log;
32 import org.apache.commons.jcs3.log.LogManager;
33 import org.apache.commons.jcs3.utils.timing.ElapsedTimer;
34
35
36
37
38
39
40
41
42
43
44 public class ZombieCacheServiceNonLocal<K, V>
45 extends ZombieCacheService<K, V>
46 implements ICacheServiceNonLocal<K, V>
47 {
48
49 private static final Log log = LogManager.getLog( ZombieCacheServiceNonLocal.class );
50
51
52 private int maxQueueSize;
53
54
55 private final ConcurrentLinkedQueue<ZombieEvent> queue;
56
57
58
59
60 public ZombieCacheServiceNonLocal()
61 {
62 queue = new ConcurrentLinkedQueue<>();
63 }
64
65
66
67
68
69
70 public ZombieCacheServiceNonLocal( final int maxQueueSize )
71 {
72 this();
73 this.maxQueueSize = maxQueueSize;
74 }
75
76
77
78
79
80
81 public int getQueueSize()
82 {
83 return queue.size();
84 }
85
86 private void addQueue(final ZombieEvent event)
87 {
88 queue.add(event);
89 if (queue.size() > maxQueueSize)
90 {
91 queue.poll();
92 }
93 }
94
95
96
97
98
99
100
101 @Override
102 public void update( final ICacheElement<K, V> item, final long listenerId )
103 {
104 if ( maxQueueSize > 0 )
105 {
106 final PutEvent<K, V> event = new PutEvent<>( item, listenerId );
107 addQueue( event );
108 }
109
110 }
111
112
113
114
115
116
117
118
119 @Override
120 public void remove( final String cacheName, final K key, final long listenerId )
121 {
122 if ( maxQueueSize > 0 )
123 {
124 final RemoveEvent<K> event = new RemoveEvent<>( cacheName, key, listenerId );
125 addQueue( event );
126 }
127
128 }
129
130
131
132
133
134
135
136 @Override
137 public void removeAll( final String cacheName, final long listenerId )
138 {
139 if ( maxQueueSize > 0 )
140 {
141 final RemoveAllEvent event = new RemoveAllEvent( cacheName, listenerId );
142 addQueue( event );
143 }
144
145 }
146
147
148
149
150
151
152
153
154
155
156 @Override
157 public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId )
158 throws IOException
159 {
160
161 return null;
162 }
163
164
165
166
167
168
169
170
171
172
173 @Override
174 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId )
175 throws IOException
176 {
177 return Collections.emptyMap();
178 }
179
180
181
182
183
184
185
186 @Override
187 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId )
188 {
189 return new HashMap<>();
190 }
191
192
193
194
195
196
197
198 @Override
199 public Set<K> getKeySet( final String cacheName )
200 {
201 return Collections.emptySet();
202 }
203
204
205
206
207
208
209
210 public synchronized void propagateEvents( final ICacheServiceNonLocal<K, V> service )
211 throws Exception
212 {
213 int cnt = 0;
214 log.info( "Propagating events to the new ICacheServiceNonLocal." );
215 final ElapsedTimer timer = new ElapsedTimer();
216 while ( !queue.isEmpty() )
217 {
218 cnt++;
219
220
221 final ZombieEvent event = queue.poll();
222
223 if ( event instanceof PutEvent )
224 {
225 @SuppressWarnings("unchecked")
226 final
227 PutEvent<K, V> putEvent = (PutEvent<K, V>) event;
228 service.update( putEvent.element, event.requesterId );
229 }
230 else if ( event instanceof RemoveEvent )
231 {
232 @SuppressWarnings("unchecked")
233 final
234 RemoveEvent<K> removeEvent = (RemoveEvent<K>) event;
235 service.remove( event.cacheName, removeEvent.key, event.requesterId );
236 }
237 else if ( event instanceof RemoveAllEvent )
238 {
239 service.removeAll( event.cacheName, event.requesterId );
240 }
241 }
242 log.info( "Propagated {0} events to the new ICacheServiceNonLocal in {1}",
243 cnt, timer.getElapsedTimeString() );
244 }
245
246
247
248
249 protected static abstract class ZombieEvent
250 {
251
252 String cacheName;
253
254
255 long requesterId;
256 }
257
258
259
260
261 private static class PutEvent<K, V>
262 extends ZombieEvent
263 {
264
265 final ICacheElement<K, V> element;
266
267
268
269
270
271
272 public PutEvent( final ICacheElement<K, V> element, final long requesterId )
273 {
274 this.requesterId = requesterId;
275 this.element = element;
276 }
277 }
278
279
280
281
282 private static class RemoveEvent<K>
283 extends ZombieEvent
284 {
285
286 final K key;
287
288
289
290
291
292
293
294 public RemoveEvent( final String cacheName, final K key, final long requesterId )
295 {
296 this.cacheName = cacheName;
297 this.requesterId = requesterId;
298 this.key = key;
299 }
300 }
301
302
303
304
305 private static class RemoveAllEvent
306 extends ZombieEvent
307 {
308
309
310
311
312 public RemoveAllEvent( final String cacheName, final long requesterId )
313 {
314 this.cacheName = cacheName;
315 this.requesterId = requesterId;
316 }
317 }
318 }