1 package org.apache.commons.jcs3.utils.threadpool;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Properties;
25 import java.util.Set;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.jcs3.log.Log;
36 import org.apache.commons.jcs3.log.LogManager;
37 import org.apache.commons.jcs3.utils.config.PropertySetter;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public class ThreadPoolManager
60 {
61
62 private static final Log log = LogManager.getLog( ThreadPoolManager.class );
63
64
65 private PoolConfiguration defaultConfig;
66
67
68 private PoolConfiguration defaultSchedulerConfig;
69
70
71 private static final String PROP_NAME_ROOT = "thread_pool";
72
73
74 private static final String DEFAULT_PROP_NAME_ROOT = "thread_pool.default";
75
76
77 private static final String PROP_NAME_SCHEDULER_ROOT = "scheduler_pool";
78
79
80 private static final String DEFAULT_PROP_NAME_SCHEDULER_ROOT = "scheduler_pool.default";
81
82
83
84
85
86 private static volatile Properties props;
87
88
89 private final ConcurrentHashMap<String, ExecutorService> pools;
90
91
92 private final ConcurrentHashMap<String, ScheduledExecutorService> schedulerPools;
93
94
95
96
97 private static class ThreadPoolManagerHolder
98 {
99 static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
100 }
101
102
103
104
105 private ThreadPoolManager()
106 {
107 this.pools = new ConcurrentHashMap<>();
108 this.schedulerPools = new ConcurrentHashMap<>();
109 configure();
110 }
111
112
113
114
115
116
117
118
119 public ExecutorService createPool( final PoolConfiguration config, final String threadNamePrefix)
120 {
121 return createPool(config, threadNamePrefix, Thread.NORM_PRIORITY);
122 }
123
124
125
126
127
128
129
130
131
132 public ExecutorService createPool( final PoolConfiguration config, final String threadNamePrefix, final int threadPriority )
133 {
134 BlockingQueue<Runnable> queue = null;
135 if ( config.isUseBoundary() )
136 {
137 log.debug( "Creating a Bounded Buffer to use for the pool" );
138 queue = new LinkedBlockingQueue<>(config.getBoundarySize());
139 }
140 else
141 {
142 log.debug( "Creating a non bounded Linked Queue to use for the pool" );
143 queue = new LinkedBlockingQueue<>();
144 }
145
146 final ThreadPoolExecutor pool = new ThreadPoolExecutor(
147 config.getStartUpSize(),
148 config.getMaximumPoolSize(),
149 config.getKeepAliveTime(),
150 TimeUnit.MILLISECONDS,
151 queue,
152 new DaemonThreadFactory(threadNamePrefix, threadPriority));
153
154
155 switch (config.getWhenBlockedPolicy())
156 {
157 case ABORT:
158 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
159 break;
160
161 case RUN:
162 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
163 break;
164
165 case WAIT:
166 throw new RuntimeException("POLICY_WAIT no longer supported");
167
168 case DISCARDOLDEST:
169 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
170 break;
171
172 default:
173 break;
174 }
175
176 pool.prestartAllCoreThreads();
177
178 return pool;
179 }
180
181
182
183
184
185
186
187
188
189 public ScheduledExecutorService createSchedulerPool( final PoolConfiguration config, final String threadNamePrefix, final int threadPriority )
190 {
191
192 return Executors.newScheduledThreadPool(
193 config.getMaximumPoolSize(),
194 new DaemonThreadFactory(threadNamePrefix, threadPriority));
195 }
196
197
198
199
200
201
202
203 public static ThreadPoolManager getInstance()
204 {
205 return ThreadPoolManagerHolder.INSTANCE;
206 }
207
208
209
210
211 public static void dispose()
212 {
213 for ( final Iterator<Map.Entry<String, ExecutorService>> i =
214 getInstance().pools.entrySet().iterator(); i.hasNext(); )
215 {
216 final Map.Entry<String, ExecutorService> entry = i.next();
217 try
218 {
219 entry.getValue().shutdownNow();
220 }
221 catch (final Throwable t)
222 {
223 log.warn("Failed to close pool {0}", entry.getKey(), t);
224 }
225 i.remove();
226 }
227
228 for ( final Iterator<Map.Entry<String, ScheduledExecutorService>> i =
229 getInstance().schedulerPools.entrySet().iterator(); i.hasNext(); )
230 {
231 final Map.Entry<String, ScheduledExecutorService> entry = i.next();
232 try
233 {
234 entry.getValue().shutdownNow();
235 }
236 catch (final Throwable t)
237 {
238 log.warn("Failed to close pool {0}", entry.getKey(), t);
239 }
240 i.remove();
241 }
242 }
243
244
245
246
247
248
249
250
251
252
253 public ExecutorService getExecutorService( final String name )
254 {
255 return pools.computeIfAbsent(name, key -> {
256 log.debug( "Creating pool for name [{0}]", key );
257 final PoolConfiguration config = loadConfig( PROP_NAME_ROOT + "." + key, defaultConfig );
258 return createPool( config, "JCS-ThreadPoolManager-" + key + "-" );
259 });
260 }
261
262
263
264
265
266
267
268
269
270
271 public ScheduledExecutorService getSchedulerPool( final String name )
272 {
273 return schedulerPools.computeIfAbsent(name, key -> {
274 log.debug( "Creating scheduler pool for name [{0}]", key );
275 final PoolConfiguration config = loadConfig( PROP_NAME_SCHEDULER_ROOT + "." + key,
276 defaultSchedulerConfig );
277 return createSchedulerPool( config, "JCS-ThreadPoolManager-" + key + "-", Thread.NORM_PRIORITY );
278 });
279 }
280
281
282
283
284
285
286 protected Set<String> getPoolNames()
287 {
288 return pools.keySet();
289 }
290
291
292
293
294
295
296
297 public static void setProps( final Properties props )
298 {
299 ThreadPoolManager.props = props;
300 }
301
302
303
304
305 private void configure()
306 {
307 log.debug( "Initializing ThreadPoolManager" );
308
309 if ( props == null )
310 {
311 log.warn( "No configuration settings found. Using hardcoded default values for all pools." );
312 props = new Properties();
313 }
314
315
316 defaultConfig = loadConfig( DEFAULT_PROP_NAME_ROOT, new PoolConfiguration() );
317 defaultSchedulerConfig = loadConfig( DEFAULT_PROP_NAME_SCHEDULER_ROOT, new PoolConfiguration() );
318 }
319
320
321
322
323
324
325
326
327 private static PoolConfiguration loadConfig( final String root, final PoolConfiguration defaultPoolConfiguration )
328 {
329 final PoolConfiguration config = defaultPoolConfiguration.clone();
330 PropertySetter.setProperties( config, props, root + "." );
331
332 log.debug( "{0} PoolConfiguration = {1}", root, config );
333
334 return config;
335 }
336 }