1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.jcs3.jcache.extras.writer;
20
21 import javax.cache.Cache;
22 import javax.cache.configuration.Factory;
23 import javax.cache.integration.CacheWriter;
24 import javax.cache.integration.CacheWriterException;
25 import java.io.Closeable;
26 import java.io.IOException;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.logging.Level;
34 import java.util.logging.Logger;
35
36 public class AsyncCacheWriter<K, V> implements CacheWriter<K, V>, Closeable, Factory<CacheWriter<K, V>>
37 {
38 private static final Logger LOGGER = Logger.getLogger(AsyncCacheWriter.class.getName());
39
40 private final CacheWriter<K, V> writer;
41 private final ExecutorService pool;
42
43 public AsyncCacheWriter(final CacheWriter<K, V> delegate, final int poolSize)
44 {
45 writer = delegate;
46 pool = Executors.newFixedThreadPool(
47 poolSize, new DaemonThreadFactory(delegate.getClass().getName() + "-" + delegate.hashCode() + "-"));
48 }
49
50 @Override
51 public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException
52 {
53 pool.submit(new ExceptionProtectionRunnable()
54 {
55 @Override
56 public void doRun()
57 {
58 writer.write(entry);
59 }
60 });
61 }
62
63 @Override
64 public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException
65 {
66 pool.submit(new ExceptionProtectionRunnable()
67 {
68 @Override
69 public void doRun()
70 {
71 writer.writeAll(entries);
72 }
73 });
74 }
75
76 @Override
77 public void delete(final Object key) throws CacheWriterException
78 {
79 pool.submit(new ExceptionProtectionRunnable()
80 {
81 @Override
82 public void doRun()
83 {
84 writer.delete(key);
85 }
86 });
87 }
88
89 @Override
90 public void deleteAll(final Collection<?> keys) throws CacheWriterException
91 {
92 pool.submit(new ExceptionProtectionRunnable()
93 {
94 @Override
95 public void doRun()
96 {
97 writer.deleteAll(keys);
98 }
99 });
100 }
101
102 @Override
103 public void close() throws IOException
104 {
105 final List<Runnable> runnables = pool.shutdownNow();
106 for (final Runnable r : runnables)
107 {
108 r.run();
109 }
110 }
111
112 @Override
113 public CacheWriter<K, V> create()
114 {
115 return this;
116 }
117
118
119 private static class DaemonThreadFactory implements ThreadFactory
120 {
121 private final AtomicInteger index = new AtomicInteger(1);
122 private final String prefix;
123
124 public DaemonThreadFactory(final String prefix)
125 {
126 this.prefix = prefix;
127 }
128
129 @Override
130 public Thread newThread( final Runnable runner )
131 {
132 final Thread t = new Thread( runner );
133 t.setName(prefix + index.getAndIncrement());
134 t.setDaemon(true);
135 return t;
136 }
137 }
138
139 private static abstract class ExceptionProtectionRunnable implements Runnable
140 {
141 @Override
142 public void run()
143 {
144 try
145 {
146 doRun();
147 }
148 catch (final Exception e)
149 {
150 LOGGER.log(Level.SEVERE, e.getMessage(), e);
151 }
152 }
153
154 protected abstract void doRun();
155 }
156 }