Java Concurrency in Practice Notes: Building Blocks (Part 1)

(This notes is from reading Brian Goetz‘s Java Concurrency in Practice)

Chapter 5 Building blocks

This chapter covers the most useful concurrent building blocks, especially those introduced in Java 5.0 and Java 6, and some patterns for using them to structure concurrent applications.

Synchronized collections

The synchronized collection classes include Vector and Hashtable, part of the original JDK, as well as their cousins added in JDK 1.2, the synchronized wrapper classes created by the Collections.synchronizeXxx factory methods.

These classes achieve thread safety by encapsulating their state and synchronizing every public method so that only one thread at a time can access the collection state.

Problems with synchronized collections

The synchronized collections are thread-safe, but you may sometimes need to use additional client-side locking to guard compound actions.

1
2
3
4
5
6
7
8
9
10
11
public class UnsafeVectorHelpers {
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}

Interleaving operations might cause ArraysIndexOutOfBoundsException

Because the synchronized collections commit to a synchronization policy that supports client-side locking, it is possible to create new operations that are atomic with respect to other collection operations as long as we know which lock to use.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SafeVectorHelpers {
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
}

Even though the iteraction below could throw an exception

1
2
3
for (int i = 0; i < vector.size(); i++) {
doSomething(vector.get(i));
}

Again we need to add a client-cide lock, although it prevents other threads from accessing it at all during this time, imparing concurrency

1
2
3
4
5
synchronized (vector) {
for (int i = 0; i < vector.size(); i++) {
doSomething(vector.get(i));
}
}

Iterators and ConcurrentModificationException

The iterators returned by the synchronized collections are not designed to deal with concurrent modification, and they are fail-fast - meaning that if they detect that the collection has changed since iteration began, they throw the unchecked ConcurrentModificationException.

1
2
3
4
5
6
7
List<Widget> widgetList =
Collections.synchronizedList(new ArrayList<Widget>());
...
// May throw ConcurrentModificationException
for (Widget w : widgetList) {
doSomething(w);
}

There are several reasons, however, why locking a collection during iteration may be undesirable. Other threads that need to access the collection will block until the iteration is complete; if the collection is large or the task performed for each element is lengthy, they could wait a long time. Also, if the collection is locked and its inside code is be locked, there is a risk factor for deadlock.

An alternative to locking the collection during the iteration is to clone the collection and iterate the copy instead.

Hidden iterators

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HiddenIterator {
@GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}

Although there are no explicit iterators above, the string concatenation gets turned by the compiler into a call to StringBuilder.append(object), which in turn invokes the collection’s toString method - and the implementation of toString in the standard collections iterates the collection and calls toString on each element.

Iteration is also indirectly invoked by the collection’s hashcode and equals methods. All of these indirect uses of iteration can cause ConcurrentModificationException.

Just as encapsulating an object’s state makes it easier to preserve its invariants, encapsulating its synchronization makes it easier to enforce its synchronization policy.

Concurrent collections

Java 5.0 improves on the synchronized collections by providing several concurrent collection classes. Synchronization collections achieve their thread safety by serializing all access to the collection’s state. The concurrent collections, on the other hand, are designed for concurrent access from multiple threads.

  • ConcurrentHashMap: a replacement for synchronized hash-based Map implementations
  • CopyOnWriteArrayList: a replaement for synchronized List implemenetations for cases where traversal is the dominant operation
  • Queue, BlockingQueue, ConcurrentLinkedQueue, PriorityQueue
  • ConcurrentSkipListMap, ConcurrentSkipListSet

Replacing synchronized collections with concurrent collections can offer dramatic scalability improveness with little risk.

ConcurrentHashMap

ConcurrentHashMap is hash-based Map like HashMap, but it uses an entirely different locking strategy that offers better concurrency and scalability. Instead of synchroinizing every method on a common lock, restricting access to a single thread at a time, it uses a finer-grained locking mechanism called lock striping to allow a greater degree of shared access.

Arbitrary many reading threads can access the map concurrently, reader can access the map concurrently with writers, and a limited number of writers can modify the map concurrently. This results far higher throughput under concurrent access, with little performance penalty for single-threaded access.

ConcurrentHashMap, along with the other concurrent collections, further improve on the syncrhonized collection classes by providing iterators that do not throw ConcurrentModificationException, thus eliminating the need to lock the collection during iteration. The iterators returned by ConcurrentHshMap are weakly consistent intead of fail-fast.

A weakly consistent iterator can tolerate concurrent modification, traverse elements as they existed when the iterator was constructed, and may (but is not guaranteed to) reflect modifications to the collection after the construction of the iterator.

The semantics of methods that operate on the entire Map, such as size and isEmpty, have been slightly weakened to reflect the concurrent nature of the collection.

The one feature offered by the synchronized Map implementations but not by ConcurrentHashMap is the ability to lock the map for exclusive access.

Replacing synchronized Map implementations with ConcurrentHashMap in most cases result only in better scalability

Addtional atomic Map operations

Since a ConcurrentHashMap cannot be locked for exclusive access, we can not use client-side locking to create new atomic operations such as put-if-absent. Instead, a number of common compound operations such as put-if-absent, remove-if-equal, and replace-if-equal are implemented as atomic operations and specified by ConcurrentHashMap interface.

1
2
3
4
5
6
7
8
9
10
public interface ConcurrentMap<K, V> extends Map<K, V> {
// Insert into map only if no value is mapped from K
V putIfAbsent(K key, V value);
// Remove only if K is mapped to V
boolean remove(K key, V value);
// Replace value only if K is mapped to oldValue
boolean replace(K key, V oldValue, V newValue);
// Replace value only if K is mapped to some value
V replace(K key, V newValue);
}

CopyOnWriteArrayList

CopyOnWriteArrayList is a concurrent replacement for a synchronized List that offeres better concurrency in some common situations and eliminates the need to lock or copy the collection during iteration. (Similarly, CopyOnWriteArraySet is a concurrent replacement for a syncrhonized Set)

They implement mutability by creating and republishing a new copy of the collection everytime it is modified. The iterators returned by the copy-on-write collections do now throw ConcurrentModificationException and return the elements exactly as they wre at the time the iterator was created, regardless of subsequent modifications.

Blocking queues and the producer-consumer pattern

Blocking queues provide blocking put and take methods as well as the timed equivalent offer and poll. If the queue is full, put blocks until space becomes available; if the queue is empty, take blocks until an element is available.

In a producer-consumer design built around a blocking queue, producers place date onto the queue as it becomes available, and consumers retrieve data from the queue when they are ready to take the appropriate action. Producers don;t need to know anything about the identity or number of consumers, or even whether they are the only producer - all they have to do is place data items on the queue. Similarly, consumers need not know who the producers are or where the work came from.

If the producers consistently generate work faster than the consumers can process it, eventually the application will run out of memory because work items will queue up without bound. If we use a bounded queue, then when the queue fills up the produces block, giving the consumers time to catch up because a blocked producer cannot generate more work.

Bounded queues are a powerful resource management tool for building reliable applications: they make your program more robust to overload by throttling activities that threaten to produce more work than can be handled.

The class library contains several implementations of BlockingQueue. LinkedBlockingQueue and ArrayBlockingQueue are FIFO queues, analogous to LinkedList and ArrayList but with better concurrent performance than a synchronized List, PriorityBlockingQueue is proprity-ordered queue, which is useful when you want to process elements in an order than FIFO.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
final FileFilter fileFilter,
File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
fileQueue.put(entry);
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true)
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) {
return true;
}
};
for (File root : roots)
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}

Serial thread confinement

For mutable objects, producer-consumer designs and blocking queues facilitate serial thread confinement for handling off ownership of objects from producers to consumers.

One could also use other publication mechanisms for transferring ownership of a mutable object, but it is necessary to ensure that only one thread receives the object being handed off.

Deques and work stealing

Java 6 adds another two collection types, Deque and BlockingDeque, that extend Queue and BlockingQueue. A Deque is a double-eneded queue that allows efficient insertion and removal from both the ahead and the tail. Implementations include ArrayDeque and LinkedBlockingDeque.

Just as blocking queue lend themselves to the producer-consumer pattern, deques lend themselves to a related pattern called work stealing. A producer-consumer design has one shared work queue for all consumers; in a work stealing design, every consumer has its own deque. If a consumer exhausts work in its own deque, it can steal work from the tail of someone else’s deque.

Work stealing is well suited to problems in which consumers are also producers - when performing a unit of work is likely to result in the identification of more work.

Blocking and interruptible methods

When a method can throw InterruptedException, it is telling you that it is a blocking method, and further that if it is interrupted, it will make an effort to stop blocking early.

Interruption is a cooperative mechanism. One thread cannot force another to stop what it is doing and do something else; when thread A interrupts thread B, A is merly requesting that B stop what it is doing when it gets to a convenient stopping point - if it feels like it.

When your code calls a method that throws InterruptedException, then your method is a blocking method too, and must have a plan for responding to interruption. For library code, there are basically two choices:

  • Propagate the InterruptedException
  • Restore the interrupt: Catch InterruptedException and restore the interrupted status by calling interrupt on current thread

You shoud not catch InterruptedException and do nothing in response. This deprives code higher up on the call stack of the opportunity to act on the interruption, because the evidence that the thread was interrupted was lost.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
void processTask(Task task) {
// Handle the task
}
interface Task {
}
}

Synchronizers

A synchronizer is any object that coordinates the control flow of threads based on its state. Blocking queues can act as synchronizers; other types of synchronizers include semaphores, barriers, and latches.

All synchronizers share certain structural properties: they encapsulate state that determines whether threads arriving at the synchronizer should be allowed to pass or forced to wait, provide methods to manipulate that state, and provide methoes to wait efficiently for the synchronizer to enter the desired state.

Latches

A latch is a synchronizer that can delay the progress of threads until it reaches its terminal state. A latch acts as a gate: until the latch reaches the terminal state the gate is closed and no thread can pass, and in the terminal state the gate opens, allowing all threads to pass.

Once the latch reaches the terminal state, it cannot change state again, so it remains open forever. Latches can be used to ensure that certain activities do not proceed until other one-time activities complete. Such as:

  • Ensuring that a computation does not proceed until recource it needs have been initialized
  • Ensuring that a service does not start until other services on which it depends have started
  • Waiting until all the parties involved in an activity

e.g. Using CountDownLatch for starting and stopping threads in timing tests.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
task.run();
} finally {
endGate.countDown();
} catch (InterruptedException ignored) {}
}
}
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}

FutureTask

FutureTask acts like a latch. A computation represented by a FutureTask is implemented with a Callable, the result-bearing equivalent of Runnable, and can be in one of three states: waiting to run, running, or completed.

Completion subsumes all the ways a computation can complete, including normal completion, cancellation, and exception. Once a FutureTask enters the completed state, it stays in that state forever.

The behavior of Future.get depends on the state of the task, if it is completed, get returns the result immediately, and otherwise blocks until the task transitions to the completed state and then returns the result or throws an exception.

FutureTask conveys the result from the thread executing the computation to the thread retrieving the result; the specification of FutureTask guarantees that this transfer constitutes a safe publication of the result.

FutureTask is used by the Executor framework to represent asynchronous tasks, and can also be used to represent any potentially lengthy computation that cn be started before the results are needed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Preloader {
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
public void start() { thread.start(); }
public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw LaunderThrowable.launderThrowable(cause);
}
}
}

When get throws an ExecutionException, the cause will fall into one of three categories: a checked exception thrown by the Callable, a RuntimeException, or an Error. We should handle each of these cases separately.

1
2
3
4
5
6
7
8
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}

Semaphores

Counting semaphores are used to control the number of activities that can access a certain resource or perform a given action at the same time. Counting semaphores can be used to implement resource pools or to impose a bound on a collection.

Semaphores are useful for implementing resource pools such as database connection pools. And you could use a Semaphore to turn any collection into a blocking bounded collection.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}

Barrier

Barrier are similar to latches in that they block a group of threads until some event has occurred. The key difference is that with a barrier, all the threads must com together at a barrier point at the same time in order to proceed. Latches are for waiting for events, barriers are for waiting for other threads.

CyclicBarrier allows a fixed number of parties to rendezvous repeatedly at a barrier point and is useful in parallel iterative algorithms that break down a problem into a fixed number of independent subproblems. Thread call await when they reach the barrier point, and await blocks until all threads have reached the barrier point.

Barriers are often used in simulations, where the work to calculate one step can be done in parallel but all the work associated with a given step must complete before advancing to the next step.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class CelluarAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CelluarAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(
count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}
});
this.workers = new Workers[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) {
this.board = board;
}
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxY(); x++) {
for (int y = 0; y < board.getMaxY(); y++) {
board.setNewValue(x, y, computeValue(x, y));
}
}
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
return;
}
}
}
}
public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
}

(To Be Continued)