Java Concurrency in Practice Notes: Task Execution

Most concurrent applications are organized around the execution of tasks: abstract, discrete units of work. Dividing the work of an application into tasks simplifies program organization, facilitates error recovery by providing natural transaction boundaries, and promotes concurrency by providing a natural structure for parallelizing work.

Task Execution

Execution tasks in threads

The first step in organizing a program around task execution is identifying sensible task boundaries. Ideally, tasks are independent activities: work that does not depend on the state, result, or side effects or other tasks.

Server applications should exhibit both good throughput and good responsiveness under normal load. Application providers want applications to support as many users as possible, so as to reduce provisioning costs per user; users want to get their response quickly.

Further, applications should exhibit graceful degradation as they become overloaded, rather than simply falling over under heavy load. Choosing good task boundaries, coupled with a sensible task execution policy, can help achieve these goals.

Executing tasks sequentially

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}

SingleThreadedWebServer is simple and theoretically correct, but would perform poorly in production because it can handle only one request at a time.

In server applications, sequential processing rarely provides either good throughput or good responsiveness. There are exceptions - such as when tasks are few and long-lived, or when the server serves a single client that makes only a single request at a time - but most server applications do not work this way.

Explicitly creating threads for tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}

ThreadPerTaskWebServer is similar in structure to the single-threaded version - the main thread still alternates between accepting an incoming connection and dispatching the request. The difference is that for each connection, the main loop creates a new thread to process the request instead of processing it within the main thread. This has 3 main consequesnces:

  • Task processing is offloaded from the main thread, enabiling the main loop to resume waiting for the next incoming connection more quickly
  • Tasks can be processed in parallel, enableing multiple requests to be serviced simultaneouly
  • Task-handling code must be thread-safe, because it may be invoked concurrently for multiple tasks

As long as the request arrival rate does not exceed the server’s capacity to handle requests, this approach offers better responsiveness and throughput

Disadvantages of unbounded thread creation

  • Thread lifecycle overhead
  • Resource consumption. Active threads consume system resources, especially memory
  • Stability. There is a limit on how many threads can be created, which varies by platform and is affected by factors including JVM invocation parameters

Up to a certain point, more threads can improve throughput, but beyond that point creating more threads just slows down your application, and creating one thread too many can cause your entire application to crash horribly. The way to stay out of danger is to place some bound on how many threads your application creates, and to test your application thoroughly to ensure that, even when this bound is reached, it does not run out of resources.

The Executor framework

Tasks are logical units of work, and threads are a mechanism by which tasks can run asynchronously. We’ve examined two policies for executing tasks using threads - execute tasks sequentiallyin a single thread, and execute each task in its own thread. Both has limitations: the sequential approach suffers from poor reponsiveness and throughput, and the thread-per-task approach suffers from poor resource management.

java.util.concurrent provides a flexible thread pool implementation as part of the Executor framework. The primary abstraction for task execution in the Java class libraries is not Thread, but Executor:

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor provides a standard means of decoupling task submission from task execution, describing tasks with Runnable. The Executor implementations also provide lifecycle support and hooks for adding statistics gathering, application management, and monitoring.

Executor is based on the producer-consumer pattern, where activities that submit tasks are the producers and the threads that execute tasks are the consumers.

Example: web server using Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec
= Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}

In TaskExecutionWebServer, submission of the request-handling task is decoupled from its execution using an Executor, and its behavior can be changed merely by substituting a different Executor implementation.

Execution policies

The value of decoupling submission from execution is that it lets you easily specify, and subsequently change without great difficulty, the execution policy for a given class of tasks.

An execution policy specifies the “what, where, when, and how”:

  • In what thread will tasks be executed?
  • In what order should tasks be executed (FIFO, LIFO, priority order)?
  • How many tasks may execute concurrently?
  • How many tasks may be queued pending execution?
  • If a task has to be rejected because the system is overloaded, which task should be selected as the victim, and how should the application be notified?
  • What actions should be taken before or after executing a task?

Seperating the specification of execution policy from task submission makes it practical to select an execution policy at deployment time that is matched to the available hardware.

Whenever you see code of the form: new Thread(runnable).start(), and you think you might at some point want a more flexible execution policy, seriously consider replacing it with the use of an Executor

Thread pools

A thread pool, as its name suggests, manages a homogeneous pool of worker threads.

Executing tasks in pool threads has a number of advantages over the thread-per-task approach:

  • Reusing an existing thread instead of creating a new one amortizes thread creation and teardown costs over multiple reqeusts
  • Since the worker thread often already exists at the time the reqeust arrives, the latency associated with thread creation does not delay task execution, thus improving responsiveness

You could create a thread pool by calling one of the static factory methods in Executors:

  • newFixedThreadPool: A fixed-size thread pool creates threads as tasks are submitted, up to the maximum pool size, and then attempts to keep the pool size constant
  • newCachedThreadPool: A cached thread pool has more flexibility to reap idle threads when the current size of the pool exceeds the demand of processing, and to add new threads when demand increases, but places no bounds on the size of the pool
  • newSingleThreadExector: A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly.
  • newScheduledThreadPool: A fixed-size thread pool that supports delayed and periodic task execution, similar to Timer

Executor lifesycle

To address the issue of execution service lifecycle, the ExecutionService interface extends Executor, adding a number of methods of lifesycle management:

1
2
3
4
5
6
7
8
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
...
}

The lifecycle implied by ExecutorService has three states - running, shutting sown, and terminated

  • ExecutorServices are initially created in the running state
  • shutdown initiates a graceful shutdown: no new tasks are accepted but previously submitted tasks are allowed to complete
  • shutdownNow initiates an abrupt shutdown: it attempts to cancel outstanding tasks and does not start any tasks that are queued but not began
  • Tasks submitted to an ExecutorService after it has been shut down are handled by the rejected execution handler
  • once all tasks have completed, the ExecutorService transitions to the terminated state
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LifecycleWebServer {
private final ExecutorService exec = Executors.newCachedThreadPool();
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown())
log("task submission rejected", e);
}
}
}
public void stop() {
exec.shutdown();
}
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req))
stop();
else
dispatchRequest(req);
}
}

LifecycleWebServer extends our web server with lifecycle support. It can be shutdown in two ways:

  • programmatically calling stop
  • sending web server a specially formatted HTTP request

Delayed and periodic tasks

The Timer facility manages execution of deferred and periodic tasks, however, Timer has some drawbacks, and ScheduledThreadPoolExecutor should be thought of as its replacements.

A Timer creates only a single thread for executing timer tasks, if a timer task takes too long to run, the timing accuracy of other TimerTasks can suffer.

Another problem with Timer is that it behaves poorly if a TimerTask throws an unchecked exception. The Timer thread doesn’t catch the exception, so an unchecked exception thrown from a TimerTask terminates the timer thread. Timer also doesn’t resurrect the thread in this situation; instead, it erroneously assumes the entire Timer was cancelled. In this case, TimerTasks that are already scheduled but not yet executed are never run, and new tasks cannot be scheduled. (it’s called “thread leakage”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class OutOfTime {
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(1);
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(5);
}
static class ThrowTask extends TimerTask {
public void run() {
throw new RuntimeException();
}
}
}

If you need to build your own scheduling service, you may still be able to take advantage of the library by using a DelayQueue, a BlockingQueue implementation that provides the scheduling functionality of ScheduledThreadPoolExecutor

Finding exploitable parallelism

Sometimes good task boundaries are not quite so obvious, as in many desktop applications. There may also be explotable parallelism within a single client request in server applications.

In this section we develop several versions of a component that admit varying degrees of concurrency. Our sample component is the page-rendering portion of a browser application, which taks a page of HTML and renders it into an image buffer.

Example: sequential page renderer

The simplest approach is to process HTML document sequentially. As text markup ie encountered, render it into the image buffer,; as image references are encountered, fetch the image over the network and draw it into the image buffer as well.

A less annoying but still sequential approach involves rendering the text elements first, leaving rectangular placeholders for the images, and after completing the initial pass on the document, going back and downloading the images and drawing them into the associated placeholder.

1
2
3
4
5
6
7
8
9
10
public abstract class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source))
imageData.add(imageInfo.downloadImage());
for (ImageData data : imageData)
renderImage(data);
}
}

Downloading an image mostly involves waiting for I/O to complete, and during this time the CPU does little work, so the sequential aapproach may under-utilize the CPU, and also makes the user wait longer than necessary to see the finished page.

Result-bearing tasks: Callable and Future

Many tasks are effective deferred computations - executing a database query, fetching a resource over the network, or computing a complicated function. For these types of tasks, Callable is a better abstraction: it expects that the main entry point, call, will return a value and anticipates that it might throw an exception.

Runnable and Callable describe abstract computational tasks. Tasks are usually finite: they have a clear starting point and they eventually terminate. The lifecycle of a task executed by an Executor has four phases: created, submitted, started, completed.

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

Future represents the lifecycle of a task and provides methods to test whether the task has completed or been cancelled, retrieve its result, and cancel the task.

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
CancellationExcetion, TimeoutException;
}

The behavior of get varies depending on the task state (not yet started, running, completed). It returns immediately or throws an Exception if the task has already completed, but if not it blocks until the task completes.

If the task completes by throwing an exception, get rethrows it wrapped in an ExecutionException; if it was cancelled, get throws CancellationException. If get throws ExecutionException, the underlying exception can be retrieved with getCause.

Submitting a Runnable or Callable to an Executor constitutes a safe publication of the Runnable or Callable from the submitting thread to the thread that will eventually execute the task.

Example: page renderer with Future

As a first step towards making the page renderer mor concurrent, let’s divide it into two tasks: one that renders the text and one that downloads all the images.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public abstract class FutureRenderer {
private final ExecutorService executor = Executors.newCachedThreadPool();
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData)
renderImage(data);
} catch (InterruptedException e) {
// Re-assert the thread's interrupted status
Thread.currentThread().interrupt();
// We don't need the result, so cancel the task too
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

We create a Callable for downloading all the images, and submit it to an ExecutorService. This returns a Future describing the task’s execution; when the main task gets to the point where it needs the images, it waits for the result by calling Future.get.

FutureRenderer allows the text to be rendered concurrently with downloading the image data. This is an improvement in that the user sees the result quickly and it exploits some parallelism, but we could do considerably better. There is no need for users to wait for all the images to be downloaded,; they would probably prefer to see individual images drawn as they become available.

Limitations of parallelizing heterogeneous tasks

Obtaining significant performance improvements by tring to parallelize sequential heterogeneous tasks can be tricky.

A futher problem with dividing heterogeneous tasks among multiple workers is that the tasks may have disparate sizes. If you divide tasks A and B between two workers but A takes ten times as long as B, you’ve only speeded up the total process by 9%. Finally, dividing a task among multiple workers always involve some amount of coordination ahead; for the division to be worthwhile, this overhead must be more than compensated by productivity improvements due to parallelism.

The real performance payoff of dividing a program’s workload into tasks comes when there are a large number of independent, homogeneous tasks that can be processed concurrently

CompletionService: Executor meets BlockingQueue

CompletionService combines the functionality of an Executor and a BlockingQueue. You can submit Callable tasks to it for execution and use the queue-like methods take and poll to retrieve completed results, packaged as Futures, as they become available.

Example: page renderer with CompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

We can use a CompletionService to improve the performance of the page renderer in two ways: shorter total runtime and improved responsiveness.

We can create a separate task for downloading each image and execute them in a thread pool, turning the sequential download into a parallel one: this reduces the amount of time to download all the images.

And by fetching results from the CompletionService and rendering each image as soon as it is available, we can give the user a more dynamic and responsive user interface.

Placing time limits on tasks

Sometimes, if an activity does not complete within a certain amount of time, the result is no longer needed and the activity can be abandoned.

The primary challenge in executing tasks within a time budget is making sure that you don’t wait longer than the time budget to get an answer or find out that one is not forthcoming.

A secondary problem when using timed tasks is to stop them when they run out of time, so they do not waste computing resouces by continuing to compute a result that will not be used.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class RenderWithTimeBudget {
private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// Render the page while waiting for the ad
Page page = renderPageBody();
Ad ad;
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
}

Example: a travel reservations portal

The time-budgeting approach in the previous section can be easily generalized to an arbitrary number of tasks. Consider a travel reservation portal: the user enters travel dates and requirements and the portal fetches and displays bids from a number of airlines, hotels or car rental companies. Depending on the company, fetching a bid might involve invoking a web service, consulting a database, performing an EDI transaction, or some other mechanism. Rather than have the response time for the page be driven by the slowest response, it may be preferable to present only the information available within a given time budget.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TimeBudget {
private static ExecutorService exec = Executors.newCachedThreadPool();
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes =
new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}
}
class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
this.company = company;
this.travelInfo = travelInfo;
}
TravelQuote getFailureQuote(Throwable t) {
return null;
}
TravelQuote getTimeoutQuote(CancellationException e) {
return null;
}
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}

The above code uses the timed version of invokedAll to submit multiple tasks to an ExecutorService and retrieve the results. The timed version of invokedAll will return when all the tasks have completed, the calling thread is interrupted, or the timeout expires. Any tasks that are not completed when the timeout expires are concelled.

Summary

  • Structuring applications around the execution of tasks can simplify development and facilitate concurrency. The Executor framework permits you to decouple task submission from execution policy and supports a rich variety of execution policies
  • Whenever you find yourself creating threads to perform tasks, consider using the Executor instead
  • To maximize the benefit of decomposing an application into tasks, you must identify sensible task boundaries
    • In some applications, the obvious task boundaries work well
    • In others analysis may be required to uncover finer-grained exploitable parallelism

(To Be Continued)