Package org.apache.commons.lang3.concurrent
java.util.concurrent
. These classes are thread-safe.
A group of classes deals with the correct creation and initialization of objects that are accessed by multiple
threads. All these classes implement the ConcurrentInitializer
interface
which provides just a single method:
public interface ConcurrentInitializer<T> {
T get() throws ConcurrentException;
}
A ConcurrentInitializer
produces an object. By calling the
get()
method the object managed by the
initializer can be obtained. There are different implementations of the interface available addressing various use
cases:
ConstantInitializer
is a very straightforward implementation of the
ConcurrentInitializer
interface: An instance is passed an object when it
is constructed. In its get()
method it simply returns this object. This is useful, for instance in unit tests
or in cases when you want to pass a specific object to a component which expects a
ConcurrentInitializer
.
The LazyInitializer
class can be used to defer the creation of an object
until it is actually used. This makes sense, for instance, if the creation of the object is expensive and would slow
down application startup or if the object is needed only for special executions.
LazyInitializer
implements the double-check idiom for an instance
field as discussed in Joshua Bloch's "Effective Java", 2nd edition, item 71. It uses volatile
fields to reduce the amount of synchronization. Note that this idiom is appropriate for instance fields only. For
static fields there are superior alternatives.
We provide an example use case to demonstrate the usage of this class: A server application uses multiple worker
threads to process client requests. If such a request causes a fatal error, an administrator is to be notified using
a special messaging service. We assume that the creation of the messaging service is an expensive operation. So it
should only be performed if an error actually occurs. Here is where
LazyInitializer
comes into play. We create a specialized subclass for
creating and initializing an instance of our messaging service.
LazyInitializer
declares an abstract
initialize()
method which we have to
implement to create the messaging service object:
public class MessagingServiceInitializer extends LazyInitializer<MessagingService> {
protected MessagingService initialize() throws ConcurrentException {
// Do all necessary steps to create and initialize the service object
MessagingService service = ...
return service;
}
}
Now each server thread is passed a reference to a shared instance of our new MessagingServiceInitializer
class. The threads run in a loop processing client requests. If an error is detected, the messaging service is
obtained from the initializer, and the administrator is notified:
public class ServerThread implements Runnable {
// The initializer for obtaining the messaging service.
private final ConcurrentInitializer<MessagingService> initializer;
public ServerThread(ConcurrentInitializer<MessagingService> init) {
initializer = init;
}
public void run() {
while (true) {
try {
// wait for request
// process request
} catch (FatalServerException ex) {
// get messaging service
try {
MessagingService svc = initializer.get();
svc.notifyAdministrator(ex);
} catch (ConcurrentException cex) {
cex.printStackTrace();
}
}
}
}
}
The AtomicInitializer
class is very similar to
LazyInitializer
. It serves the same purpose: to defer the creation of an
object until it is needed. The internal structure is also very similar. Again there is an abstract
initialize()
method which has to be
implemented by concrete subclasses in order to create and initialize the managed object. Actually, in our example
above we can turn the MessagingServiceInitializer
into an atomic initializer by simply changing the
extends declaration to refer to AtomicInitializer<MessagingService>
as super class.
With AtomicSafeInitializer
there is yet another variant implementing the
lazy initializing pattern. Its implementation is close to
AtomicInitializer
; it also uses atomic variables internally and therefore
does not need synchronization. The name "Safe" is derived from the fact that it implements an additional
check which guarantees that the initialize()
method is called only once. So it behaves exactly in the same way as
LazyInitializer
.
Now, which one of the lazy initializer implementations should you use? First of all we have to state that is problematic to give general recommendations regarding the performance of these classes. The initializers make use of low-level functionality whose efficiency depends on multiple factors including the target platform and the number of concurrent threads. So developers should make their own benchmarks in scenarios close to their specific use cases. The following statements are rules of thumb which have to be verified in practice.
AtomicInitializer
is probably the most efficient implementation due to
its lack of synchronization and further checks. Its main drawback is that the initialize()
method can be
called multiple times. In cases where this is not an issue
AtomicInitializer
is a good choice.
AtomicSafeInitializer
and
LazyInitializer
both guarantee that the initialization method is called
only once. Because AtomicSafeInitializer
does not use synchronization it
is probably slightly more efficient than LazyInitializer
, but the
concrete numbers might depend on the level of concurrency.
Another implementation of the ConcurrentInitializer
interface is
BackgroundInitializer
. It is again an abstract base class with an
initialize()
method that has to be
defined by concrete subclasses. The idea of BackgroundInitializer
is that
it calls the initialize()
method in a separate worker thread. An application creates a background initializer
and starts it. Then it can continue with its work while the initializer runs in parallel. When the application needs
the results of the initializer it calls its get()
method. get()
blocks until the initialization is
complete. This is useful for instance at application startup. Here initialization steps (e.g. reading configuration
files, opening a database connection, etc.) can be run in background threads while the application shows a splash
screen and constructs its UI.
As a concrete example consider an application that has to read the content of a URL - maybe a page with news - which
is to be displayed to the user after login. Because loading the data over the network can take some time a
specialized implementation of BackgroundInitializer
can be created for
this purpose:
public class URLLoader extends BackgroundInitializer<String> {
// The URL to be loaded.
private final URL url;
public URLLoader(URL u) {
url = u;
}
protected String initialize() throws ConcurrentException {
try {
InputStream in = url.openStream();
// read content into string
...
return content;
} catch (IOException ioex) {
throw new ConcurrentException(ioex);
}
}
}
An application creates an instance of URLLoader
and starts it. Then it can do other things. When it needs the
content of the URL it calls the initializer's get()
method:
URL url = new URL("http://www.application-home-page.com/");
URLLoader loader = new URLLoader(url);
loader.start(); // this starts the background initialization
// do other stuff
...
// now obtain the content of the URL
String content;
try {
content = loader.get(); // this may block
} catch (ConcurrentException cex) {
content = "Error when loading URL " + url;
}
// display content
Related to BackgroundInitializer
is the
MultiBackgroundInitializer
class. As the name implies, this class can
handle multiple initializations in parallel. The basic usage scenario is that a
MultiBackgroundInitializer
instance is created. Then an arbitrary number
of BackgroundInitializer
objects is added using the
MultiBackgroundInitializer.addInitializer(String, BackgroundInitializer)
method. When adding an initializer a string has to be provided which is later used to obtain the result for this
initializer. When all initializers have been added the
BackgroundInitializer.start()
method is called. This starts
processing of all initializers. Later the get()
method can be called. It waits until all initializers have
finished their initialization. get()
returns an object of type
MultiBackgroundInitializer.MultiBackgroundInitializerResults
. This object
provides information about all initializations that have been performed. It can be checked whether a specific
initializer was successful or threw an exception. Of course, all initialization results can be queried.
With MultiBackgroundInitializer
we can extend our example to perform
multiple initialization steps. Suppose that in addition to loading a web site we also want to create a JPA entity
manager factory and read a configuration file. We assume that corresponding
BackgroundInitializer
implementations exist. The following example
fragment shows the usage of MultiBackgroundInitializer
for this purpose:
MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();
initializer.addInitializer("url", new URLLoader(url));
initializer.addInitializer("jpa", new JPAEMFInitializer());
initializer.addInitializer("config", new ConfigurationInitializer());
initializer.start(); // start background processing
// do other interesting things in parallel
...
// evaluate the results of background initialization
MultiBackgroundInitializer.MultiBackgroundInitializerResults results =
initializer.get();
String urlContent = (String) results.getResultObject("url");
EntityManagerFactory emf =
(EntityManagerFactory) results.getResultObject("jpa");
...
The child initializers are added to the multi initializer and are assigned a unique name. The object returned by the
get()
method is then queried for the single results using these unique names.
If background initializers - including MultiBackgroundInitializer
- are
created using the standard constructor, they create their own ExecutorService
which is
used behind the scenes to execute the worker tasks. It is also possible to pass in an
ExecutorService
when the initializer is constructed. That way client code can configure
the ExecutorService
according to its specific needs; for instance, the number of threads
available could be limited.
Utility Classes
Another group of classes in the new concurrent
package offers some generic functionality related to
concurrency. There is the ConcurrentUtils
class with a bunch of static
utility methods. One focus of this class is dealing with exceptions thrown by JDK classes. Many JDK classes of the
executor framework throw exceptions of type ExecutionException
if something goes wrong.
The root cause of these exceptions can also be a runtime exception or even an error. In typical Java programming you
often do not want to deal with runtime exceptions directly; rather you let them fall through the hierarchy of method
invocations until they reach a central exception handler. Checked exceptions in contrast are usually handled close to
their occurrence. With ExecutionException
this principle is violated. Because it is a
checked exception, an application is forced to handle it even if the cause is a runtime exception. So you typically
have to inspect the cause of the ExecutionException
and test whether it is a checked
exception which has to be handled. If this is not the case, the causing exception can be rethrown.
The ConcurrentUtils.extractCause(java.util.concurrent.ExecutionException)
method does this work for you. It is passed an ExecutionException
and tests its root
cause. If this is an error or a runtime exception, it is directly rethrown. Otherwise, an instance of
ConcurrentException
is created and initialized with the root cause
(ConcurrentException
is a new exception class in the
o.a.c.l.concurrent
package). So if you get such a
ConcurrentException
, you can be sure that the original cause for the
ExecutionException
was a checked exception. For users who prefer runtime exceptions in
general there is also an
ConcurrentUtils.extractCauseUnchecked(java.util.concurrent.ExecutionException)
method which behaves like extractCause()
, but returns the unchecked exception
ConcurrentRuntimeException
instead.
In addition to the extractCause()
methods there are corresponding
ConcurrentUtils.handleCause(java.util.concurrent.ExecutionException)
and
ConcurrentUtils.handleCauseUnchecked(java.util.concurrent.ExecutionException)
methods. These methods extract the cause of the passed in ExecutionException
and throw
the resulting ConcurrentException
or
ConcurrentRuntimeException
. This makes it easy to transform an
ExecutionException
into a
ConcurrentException
ignoring unchecked exceptions:
Future<Object> future = ...;
try {
Object result = future.get();
...
} catch (ExecutionException eex) {
ConcurrentUtils.handleCause(eex);
}
There is also some support for the concurrent initializers introduced in the last sub section. The
initialize()
method is passed a ConcurrentInitializer
object and
returns the object created by this initializer. It is null-safe. The initializeUnchecked()
method works
analogously, but a ConcurrentException
throws by the initializer is
rethrown as a ConcurrentRuntimeException
. This is especially useful if
the specific ConcurrentInitializer
does not throw checked exceptions.
Using this method the code for requesting the object of an initializer becomes less verbose. The direct invocation
looks as follows:
ConcurrentInitializer<MyClass> initializer = ...;
try {
MyClass obj = initializer.get();
// do something with obj
} catch (ConcurrentException cex) {
// exception handling
}
Using the ConcurrentUtils.initializeUnchecked(ConcurrentInitializer)
method, this becomes:
ConcurrentInitializer<MyClass> initializer = ...;
MyClass obj = ConcurrentUtils.initializeUnchecked(initializer);
// do something with obj
Another utility class deals with the creation of threads. When using the Executor framework new in JDK 1.5
the developer usually does not have to care about creating threads; the executors create the threads they need on
demand. However, sometimes it is desired to set some properties of the newly created worker threads. This is possible
through the ThreadFactory
interface; an implementation of this interface has to be
created and passed to an executor on creation time. Currently, the JDK does not provide an implementation of
ThreadFactory
, so one has to start from scratch.
With BasicThreadFactory
Commons Lang has an implementation of
ThreadFactory
that works out of the box for many common use cases. For instance, it is
possible to set a naming pattern for the new threads, set the daemon flag and a priority, or install a handler for
uncaught exceptions. Instances of BasicThreadFactory
are created and
configured using the nested BasicThreadFactory.Builder
class. The
following example shows a typical usage scenario:
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("worker-thread-%d")
.daemon(true)
.uncaughtExceptionHandler(myHandler)
.build();
ExecutorService exec = Executors.newSingleThreadExecutor(factory);
The nested BasicThreadFactory.Builder
class defines some methods for
configuring the new BasicThreadFactory
instance. Objects of this class
are immutable, so these attributes cannot be changed later. The naming pattern is a string which can be passed to
String.format(java.util.Locale, String, Object...)
. The placeholder %d is replaced by an
increasing counter value. An instance can wrap another ThreadFactory
implementation;
this is achieved by calling the builder's
wrappedFactory(ThreadFactory)
method. This factory is then used for creating new threads; after that the specific
attributes are applied to the new thread. If no wrapped factory is set, the default factory provided by the JDK is
used.
Synchronization objects
The concurrent
package also provides some support for specific synchronization problems with threads.
TimedSemaphore
allows restricted access to a resource in a given time
frame. Similar to a semaphore, a number of permits can be acquired. What is new is the fact that the permits
available are related to a given time unit. For instance, the timed semaphore can be configured to allow 10 permits
in a second. Now multiple threads access the semaphore and call its
TimedSemaphore.acquire()
method. The semaphore keeps track about the
number of granted permits in the current time frame. Only 10 calls are allowed; if there are further callers, they
are blocked until the time frame (one second in this example) is over. Then all blocking threads are released, and
the counter of available permits is reset to 0. So the game can start anew.
What are use cases for TimedSemaphore
? One example is to artificially
limit the load produced by multiple threads. Consider a batch application accessing a database to extract statistical
data. The application runs multiple threads which issue database queries in parallel and perform some calculation on
the results. If the database to be processed is huge and is also used by a production system, multiple factors have
to be balanced: On one hand, the time required for the statistical evaluation should not take too long. Therefore you
will probably use a larger number of threads because most of its life time a thread will just wait for the database
to return query results. On the other hand, the load on the database generated by all these threads should be limited
so that the responsiveness of the production system is not affected. With a
TimedSemaphore
object this can be achieved. The semaphore can be
configured to allow e.g. 100 queries per second. After these queries have been sent to the database the threads have
to wait until the second is over - then they can query again. By fine-tuning the limit enforced by the semaphore a
good balance between performance and database load can be established. It is even possible to chang? the number of
available permits at runtime. So this number can be reduced during the typical working hours and increased at night.
The following code examples demonstrate parts of the implementation of such a scenario. First the batch application
has to create an instance of TimedSemaphore
and to initialize its
properties with default values:
TimedSemaphore semaphore = new TimedSemaphore(1, TimeUnit.SECONDS, 100);
Here we specify that the semaphore should allow 100 permits in one second. This is effectively the limit of database queries per second in our example use case. Next the server threads issuing database queries and performing statistical operations can be initialized. They are passed a reference to the semaphore at creation time. Before they execute a query they have to acquire a permit.
public class StatisticsTask implements Runnable {
// The semaphore for limiting database load.
private final TimedSemaphore semaphore;
public StatisticsTask(TimedSemaphore sem, Connection con) {
semaphore = sem;
...
}
//The main processing method. Executes queries and evaluates their results.
public void run() {
try {
while (!isDone()) {
semaphore.acquire(); // enforce the load limit
executeAndEvaluateQuery();
}
} catch (InterruptedException iex) {
// fall through
}
}
}
The important line here is the call to semaphore.acquire()
. If the number of permits in the current time
frame has not yet been reached, the call returns immediately. Otherwise, it blocks until the end of the time frame.
The last piece missing is a scheduler service which adapts the number of permits allowed by the semaphore according
to the time of day. We assume that this service is pretty simple and knows only two different time slots: working
shift and night shift. The service is triggered periodically. It then determines the current time slot and configures
the timed semaphore accordingly.
public class SchedulerService {
// The semaphore for limiting database load.
private final TimedSemaphore semaphore;
...
// Configures the timed semaphore based on the current time of day. This method is called periodically.
public void configureTimedSemaphore() {
int limit;
if (isWorkshift()) {
limit = 50; // low database load
} else {
limit = 250; // high database load
}
semaphore.setLimit(limit);
}
}
With the TimedSemaphore.setLimit(int)
method the number of permits
allowed for a time frame can be changed. There are some other methods for querying the internal state of a timed
semaphore. Also some statistical data is available, e.g. the average number of acquire()
calls per time
frame. When a timed semaphore is no more needed, its shutdown()
method has to be called.
-
ClassDescriptionBase class for circuit breakers.An internal enumeration representing the different states of a circuit breaker.AbstractConcurrentInitializer<T,
E extends Exception> Abstracts and defines operations forConcurrentInitializer
implementations.AbstractConcurrentInitializer.AbstractBuilder<I extends AbstractConcurrentInitializer<T,E>, T, B extends AbstractConcurrentInitializer.AbstractBuilder<I, T, B, E>, E extends Exception> Builds a new instance for subclasses.Proxies to aFuture
for subclassing.A specialized implementation of theConcurrentInitializer
interface based on anAtomicReference
variable.Builds a new instance.A specializedConcurrentInitializer
implementation which is similar toAtomicInitializer
, but ensures that theAbstractConcurrentInitializer.initialize()
method is called only once.Builds a new instance.A class that allows complex initialization operations in a background task.Builds a new instance.An implementation of theThreadFactory
interface that provides some configuration options for the threads it creates.A builder class for creating instances ofBasicThreadFactory
.A specializedBackgroundInitializer
implementation that wraps aCallable
object.An interface describing a Circuit Breaker component.An exception class used for reporting runtime error conditions related to circuit breakers.Computable<I,O> Definition of an interface for a wrapper around a calculation that takes a single parameter and returns a result.An exception class used for reporting error conditions related to accessing data of background tasks.Defines the thread-safe initialization of objects.An exception class used for reporting runtime error conditions related to accessing data of background tasks.A utility class providing functionality related to thejava.util.concurrent
package.A very simple implementation of theConcurrentInitializer
interface which always returns the same object.A simple implementation of the Circuit Breaker pattern that counts specific events.Consists of utility methods that work withFutureTask
.This class provides a generic implementation of the lazy initialization pattern.Builds a new instance.Memoizer<I,O> Definition of an interface for a wrapper around a calculation that takes a single parameter and returns a result.A specializedBackgroundInitializer
implementation that can deal with multiple background initialization tasks.A data class for storing the results of the background initialization performed byMultiBackgroundInitializer
.A simple implementation of the Circuit Breaker pattern that opens if the requested increment amount is greater than a given threshold.A specialized semaphore implementation that provides a number of permits in a given time frame.UncheckedExecutionException
.AnFuture
that throws unchecked instead checked exceptions.UncheckedTimeoutException
.