Java Concurrency in Practice Notes: Applying Thread Pools

This chapter looks at advanced options for configuring and tuning thread pools, described hazards to watch for when using the task exeuction framework, and offers some more advanced examples of using Executor.

Applying Thread Pools

Implicit couplings between tasks and execution policies

While the Executor framework offers substantial flexibility in specifying and modifying execution policies, not all tasks are compatible with all execution policies. Types of tasks that require specific execution policies include:

  • Dependent tasks
  • Tasks that exploit thread confinement
  • Responsive-time-sensitive tasks
  • Tasks that use ThreadLocal

Thread pools work best when tasks are homogeneous and independent. Mixing long-running and short-running tasks risks “clogging” the pool unless it is very large; submitting tasks that depend on other tasks risks deadlock unless the pool is unbounded.

Some tasks have characteristics that require or preclude a specific execution policy. Tasks that depend on other tasks require that the thread pool be large enough that tasks are never queued or rejected; tasks that exploit thread confinement require sequential execution. Document these requirements so that future maintainers do no undermine safety or liveness by substituting an imcompatible execution policy.

Thread starvation deadlock

If all threads are executing tasks that are blocked waiting for other tasks still on the work queue, it is called thread starvation deadlock, and can occur whenever a pool task initiates an unbounded blocking wait for some resource or condition that can succeed only through the action of another pool task.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class LoadFileTask implements Callable<String> {
private final String fileName;
public LoadFileTask(String fileName) {
this.fileName = fileName;
}
public String call() throws Exception {
// Here's where we would actually read the file
return "";
}
}
public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
// Will deadlock -- task waiting for result of subtask
return header.get() + page + footer.get();
}
private String renderBody() {
// Here's where we would actually render the page
return "";
}
}
}

ThreadDeadLock illustrates thread starvation deadlock.

Whenever you submit to an Executor tasks that are not independent, beware of the possibility of thread starvation deadlock, and document any pool sizing or configuration constraints in the code or configuration file where the Executor is configured.

Long-running tasks

Thread pools can have responsiveness problems if tasks can block for extended periods of time, even if deadlock is not a possibility. A thread pool can become clogged with long-running tasks, increasing the service time even for short tasks.

One technique that can mitigate the ill effects of long-running tasks is for tasks to use timed resource waits instead of unbounded waits. Most blocking methods in the platform libraries come in both untimed and timed versions, such as Thread.join, BlockingQueue.put, CountDownLatch.await, and Selector.select.

Sizing thread pools

The ideal size for a thread pool depends on the types of tasks that will be submitted and the chracteristics of the deployment system. Thread pool sizes should rarely be hard-coded; instead pool sizes should be provided by a configuration mechanism or computed dynamically by consulting Runtime.availableProcessors.

For compute-intensive tasks, an Ncpu - processor system usually achieves optional utilization with a thread pool of Ncpu + 1 threads. For tasks that also include I/O or other blocking operations, you want a larger pool, since not all of the threads will schedulable at all times.

Given these definitions:

1
2
3
Ncpu = number of CPUs
Ucpu = target CPU utilization, 0 < Ucpu <= 1
W/C = ratio of wait time to compute time

The optimal pool size for keeping the processors at the desired utilization is:

1
Nthreads = Ncpu * Ucpu * (1 + W/C)

Configuring ThreadPoolExecutor

ThreadPoolExecutor provides the base implementation for the executors returned by the newCachedThreadPool, newFixedThreadPool, and newScheduledThreadExecutor factories in Executors. If the default execution policy does not meet your needs, you can instantiate a ThreadPoolExecutor through its construtor and customize it as you see fit.

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue,
ThreadFactory, threadFactory,
RejectedExecutionHandler handler) { /*...*/ }

Thread creation and teardown

The core pool size, maximum pool size, and keep-alive time govern thread creation and teardown.

  • The core size is the target size; the implementation attempts to maintain the pool at this size even when there are no tasks to execute; and will not create more thread than this unless the work queue is full
  • The maximum pool size is the upper bound on how many pool threads can be active at once
  • A thread that has been idle for longer than keep-alive time becomes a candidate for reaping and can be terminated if the current pool size exceeds the core size

Managing queued tasks

In chapter 6 we saw how unbounded thread creation could lead to instability, and addressed this problem by using a fixed-sized thread pool instead of creating new thread for every request. However, this is only a partial solution; it is still possible for the application to run out of resources under heavy load, just harder.

If the arrival rate of new requests exceeds the rate at which they can be handled, requests will still queue up. With a thread pool, they wait in a queue of Runnables managed by the Executor instead of queueing up as threads contending for the CPU. Representing a waiting task with a Runnable and a list not is certainly a lot cheaper than with a thread, but the risk of resource exhaustion still remains if clients can throw requests at the server faster than it can handle them.

ThreadPoolExecutor allows you to supply a BlockingQueue to hold tasks awaiting execution. There are three basic approaches to task queueing: unbounded queue, bounded queue, and synchronous handoff.

The newCachedThreadPool factory is a good default choice for an Executor, providing better queueing performance than a fixed thread pool. A fixed size thread pool is a good choice when you need to limit the number of concurrent tasks for resource-management purposes, as in a server application that accepts requests from network clients and would otherwise be vulnerable to overload.

Saturation policies

When a bounded work queue fills up, the saturation policy comes into play. The sturation policy for a ThreadPoolExecutor can be modified by calling setRejectedExecutionHandler.

Several implementations of RejectedExecutionHandler are provided, each implementing a different saturation policy:

  • AbortPolicy: default, causes execute to throw the unchecked RejectedExecutionException, the caller can catch this exception and implement its own overflow handling as it sees fit
  • CallerRunsPolicy: implements a form of throttling that neither discards tasks nor throws an exception, but instead tries to slow down the flow of new tasks by pusing some of the work back to caller
  • DiscardPolicy: silently discards the newly submitted task if it cannot be queued for execution
  • DiscardOldestPolicy: discards the task that would otherwise be executed next and tries to resubmit the new task

Choosing a saturation policy or making other changes to the execution policy can be done when the Executor is created.

1
2
3
4
ThreadPoolExecutor executor =
new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

We could also use Semaphore to bound the task injection rate.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@ThreadSafe
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}

Thread factories

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

Whenever a thread pool needs to create a thread, it does so through a thread factory. The default thread factory creates a new nondaemon thread with no special configuration. Specifying a thread factory allows you to customize the configuration of pool threads.

1
2
3
4
5
6
7
8
9
10
11
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}

MyThreadFactory will instantiate a new MyAppThread with a pool-specific name, so threads from each pool can be distinguished in thread dumps and error logs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}

Customizing ThreadPoolExecutor after construction

Most of the options passed to the ThreadPoolExecutor constructors can also be modified after construction via setters.

1
2
3
4
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) exec).setCorePoolSize(10);
}

Executors includes a factory method unconfigurableExecutorService, which takes an existing ExecutorService and wraps it with one exposing only the methods of ExecutorService so it can not be futher configured.

Extending ThreadPoolExecutor

ThreadPoolExecutor was designed for extension, providing several “hooks” for subclasses to override - beforeExecute, afterExecute, and terminated - that can be used to extend the behavior or ThreadPoolExecutor.

Example: adding statistics to a thread pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}

TimingThreadPool shows a custom thread pool that uses beforeExecute, afterExecute, and terminated to add logging and statistics gathering.

Parallelizing recursive algorithms

If we have a loop whose iterations are independent and we don’t need to wait for all of them to complete before processing, we can use an Executor to transform a sequential loop into a parallel one:

1
2
3
4
5
6
7
8
9
10
11
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnble() {
public void run() { process(e); }
});
}

Sequential loop iterations are suitable for parallelization when each iteration is independent of the others and the work done in each iteration of the loop body is significant enough to offset the cost of managing a new task.

Loop parallelization can also be applied to some recursive designs. The easier case is when each iteration does not require the results of the recursive iteration it invokes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}

Example: A puzzle framework

We define a “puzzle” as a combination of an initial position, a goal position, and a set of rules that determine valid moves.

1
2
3
4
5
6
7
8
9
public interface Puzzle <P, M> {
P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves(P position);
P move(P position, M move);
}

From this interface, we can write a simple sequential solver that searches the puzzle space until a solution is found or the puzzle space is exhausted.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Immutable
public class PuzzleNode <P, M> {
final P pos;
final M move;
final PuzzleNode<P, M> prev;
public PuzzleNode(P pos, M move, PuzzleNode<P, M> prev) {
this.pos = pos;
this.move = move;
this.prev = prev;
}
List<M> asMoveList() {
List<M> solution = new LinkedList<M>();
for (PuzzleNode<P, M> n = this; n.move != null; n = n.prev)
solution.add(0, n.move);
return solution;
}
}

Node represents a position that has been reached through some series of moves, holding a reference to the move that created the position and the previous Move.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SequentialPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final Set<P> seen = new HashSet<P>();
public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
}
public List<M> solve() {
P pos = puzzle.initialPosition();
return search(new PuzzleNode<P, M>(pos, null, null));
}
private List<M> search(PuzzleNode<P, M> node) {
if (!seen.contains(node.pos)) {
seen.add(node.pos);
if (puzzle.isGoal(node.pos))
return node.asMoveList();
for (M move : puzzle.legalMoves(node.pos)) {
P pos = puzzle.move(node.pos, move);
PuzzleNode<P, M> child = new PuzzleNode<P, M>(pos, move, node);
List<M> result = search(child);
if (result != null)
return result;
}
}
return null;
}
}

SequentialPuzzleSolver shows a sequential solver for the puzzle framework that performs a DFS of the puzzle space.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ConcurrentPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
this.exec = initThreadPool();
this.seen = new ConcurrentHashMap<P, Boolean>();
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
}
private ExecutorService initThreadPool() {
return Executors.newCachedThreadPool();
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// block until solution found
PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new SolverTask(p, m, n);
}
protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
}
public void run() {
if (solution.isSet()
|| seen.putIfAbsent(pos, true) != null)
return; // already solved or seen this position
if (puzzle.isGoal(pos))
solution.setValue(this);
else
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos, m), m, this));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ThreadSafe
public class ValueLatch <T> {
@GuardedBy("this") private T value = null;
private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet() {
return (done.getCount() == 0);
}
public synchronized void setValue(T newValue) {
if (!isSet()) {
value = newValue;
done.countDown();
}
}
public T getValue() throws InterruptedException {
done.await();
synchronized (this) {
return value;
}
}
}

ConcurrentPuzzleSolver does not deal well with the case where there is no solution. One possible solution is to keep a count of active solver tasks and set the solution to null when the count drops to zero:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class PuzzleSolver <P,M> extends ConcurrentPuzzleSolver<P, M> {
PuzzleSolver(Puzzle<P, M> puzzle) {
super(puzzle);
}
private final AtomicInteger taskCount = new AtomicInteger(0);
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new CountingSolverTask(p, m, n);
}
class CountingSolverTask extends SolverTask {
CountingSolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
taskCount.incrementAndGet();
}
public void run() {
try {
super.run();
} finally {
if (taskCount.decrementAndGet() == 0)
solution.setValue(null);
}
}
}
}

Summary

The Executor framework is a powerful and flexible framework for concurrently executing tasks. It offers a number of tunning options, such as policies for creating and tearing down threads, handling queued tasks, and what to do with excess tasks, and provides several hooks for extending its behavior. As in most powerful frameworks, however, there are some combinations of settings that do not work well together; some types of tasks require specific execution policies, and some combinations of tunning parameters may produce strange results.

(To Be Continued)