Reactive Programming with RxJava: Reactive from Top to Bottom

Truly reactive applications use streams pretty much from top to bottom. This approach simplifies reasoning and makes our application very consistent. Nonblocking applications tend to provide great performance and throughput for a fraction of the hardware. By limiting the number of threads, we are able to fully utilize CPU without consuming gigabytes of memory.

One of the limiting factors of scalability in Java is the I/O mechanism. The java.io package is very well designed with lots of small Input/OutputStream and Reader/Writer implementations that decorate and wrap one another, adding one functionality at a time.

As much as I like this beautiful separation of concerns, standard I/O in Java is entirely blocking, meaning every thread that wishes to read or write from a Socket or File must wait indefinitely for the result. Even worse, threads stuck at an I/O operation due to slow network or an even slower spinning disk are hard to interrupt. Blocking on its own is not an issue, when one thread is blocked, others can still interact with remaining open Sockets. But threads are expensive to create and manage, and switching between them takes time. Java applications are perfectly capable of handling tens of thousands of concurrent connections, but you must design them carefully. This design effort is greatly reduced when RxJava is combined with some modern event-driven libraries and frameworks.

Beating the C10k Problem

The C10k problem was an area of research and optimization that tried to achieve 10,000 concurrent connections on a single commodity server. Even these days, solving this engineering task with the traditional Java toolkit is a challenge.

We will go through several examples of a simple HTTP server and observe how it behaves with respect to design choices we made. Admittedly, performance and scalability does have a complexity price tag. But with RxJava the additional complexity will be reduced significantly.

The classic thread per connection model struggles to solve the C10k problem. With 10,000 threads we do the following:

  • Consume several gigabytes of RAM for stack space
  • Put great pressure on the garbage collection mechanism, despite that stack space is not garbage-collected (lots of GC roots and live objects)
  • Waste significant amount of CPU time simply switching cores to run different threads (context switching)

The classic thread-per-Socket model served us really well, and as a matter of fact it works quite good in many applications to this day. However, after you reach certain level of concurrency, the number of threads becomes dangerous. A thousand concurrent connections handled by a single commodity server is not something unusual, especially with long-living TCP/IP connections like HTTP with a Keep-Alive header, server-sent events, or WebSockets. However, each thread occupies a little bit of memory (stack space), regardless of whether it is computing something or just waiting idle for data.

There are two independent approaches to scalability: horizontal and vertical. To handle more concurrent connections we can simply spin up more servers, each managing a subset of the load. This requires a frontend load-balancer and does not solve the original C10k problem that expects just one server. On the other hand, vertical scalability means purchasing bigger and more capable servers. However, with blocking I/O we need a disproportional amount of memory compared to heavily underutilized CPU.

Even if a big enterprise server can handle hundreds of thousands of concurrent connections (at very high price), it is far from solving C10M problem—ten million concurrent connections. This number is not a coincidence; a couple of years ago, a properly designed Java application reached that enormous level on a typical server.

This chapter takes you on a journey through different ways of implementing an HTTP server. From single-threaded servers, through thread pools, to entirely event-driven architectures. The idea behind this exercise is to compare the implementation complexity versus performance and throughput. In the end, you will notice that the version using RxJava combines both relative simplicity and outstanding performance.

Traditional Thread-Based HTTP Servers

The purpose of this section is to compare how blocking servers, even when written properly, behave under high load. This is the exercise that we probably all went through during our education: writing a server on top of raw sockets. We will be implementing an extremely simple HTTP server that responds with 200 OKs for every request. As a matter of fact, for the sake of simplicity we will ignore the request altogether.

SINGLE THREADED SERVER

The simplest implementation just opens a ServerSocket and handles client connections as they come. When a single client is served, all other requests are queued up. The following code snippet is actually very simple:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class SingleThread {
public static final byte[] RESPONSE = (
"HTTP/1.1 200 OK\r\n" +
"Content-length: 2\r\n" +
"\r\n" +
"OK").getBytes();
public static void main(String[] args) throws IOException {
final ServerSocket serverSocket = new ServerSocket(8080, 100);
while (!Thread.currentThread().isInterrupted()) {
final Socket client = serverSocket.accept();
handle(client);
}
}
private static void handle(Socket client) {
try {
while (!Thread.currentThread().isInterrupted()) {
readFullRequest(client);
client.getOutputStream().write(RESPONSE);
}
} catch (Exception e) {
e.printStackTrace();
IOUtils.closeQuietly(client);
}
}
private static void readFullRequest(Socket client) throws IOException {
BufferedReader reader = new BufferedReader(
new InputStreamReader(client.getInputStream()));
String line = reader.readLine();
while (line != null && !line.isEmpty()) {
line = reader.readLine();
}
}
}

For each request we ignore whatever was sent to us and return 200 OK responses. Opening localhost:8080 in the browser succeeds with an OK text reply. The class is named SingleThread for a reason. ServerSocket.accept() blocks until any client establishes a connection with us. Then, it returns a client Socket. While we interact with that Socket (read and write to it), we still listen for incoming connections but no one picks them up because our thread is busy handling first client.

Now, coming back to our client connection, we first must read the entire request and then write the response. Both of these operations are potentially blocking and subject to network slowness and congestion. If one client establishes a connection but then waits a few seconds before sending a request, all other clients must wait. Having just a single thread for handling all incoming connections is clearly not very scalable, we barely solved the C1 (one concurrent connection) problem.

ThreadPerConnection shows how to implement a blocking server that creates a new thread per each client connection. This presumably scales quite well, but such implementation suffers the same problems as fork() in C: starting a new thread takes some time and resources, which is especially wasteful for short-lived connections. Moreover, there is no limit to the maximum number of client threads running at the same time. And when you do not put a limit on something in the computer system, this limit will be applied for you in the worst and least expected place. For example, our program will become unstable and eventually crash with OutOfMemoryError in case of thousands of concurrent connections.

ThreadPool also uses a thread per connection, but threads are recycled when a client disconnects so that we do not pay the price of thread warm up for every client. This is pretty much how all popular servlet containers like Tomcat and Jetty work, managing 100 to 200 threads in a pool by default. Tomcat has the so-called NIO connector that handles some of the operations on sockets asynchronously, but the real work in servlets and frameworks built on top of them is still blocking. This means that traditional applications are inherently limited to a couple thousand connections, even built on top of modern servlet containers.

Nonblocking HTTP Server with Netty and RxNetty

We will now focus on event-driven approaches to writing an HTTP server, which are far more promising in terms of scalability. A blocking processing model involving thread-per-request clearly does not scale. We need a way of managing several client connections with just a handful of threads. This has a lot of benefits:

  • Reduced memory consumption
  • Better CPU and CPU cache utilization
  • Greatly improved scalability on a single node

One caveat is the lost simplicity and clarity. Threads are not allowed to block on any operation, we can no longer pretend that receiving or sending data over the wire is the same as a local method invocation. The latency is unpredictable and response times higher by orders of magnitude. By the time you read this, there will probably still be quite a few spinning hard drives out there, which are even slower than a local area networks.

In this section, we will develop a tiny event-driven application with the Netty framework and later refactor it to use RxNetty. Finally, we conclude with a benchmark of all solutions.

Netty is entirely event-driven; we never block waiting for data to be sent or received. Instead, raw bytes in the form of ByteBuf instances are pushed to our processing pipeline.

TCP/IP gives us an impression of connection and data flowing byte after byte between two computers. But in reality TCP/IP is built on top of IP, which can barely transfer chunks of data known as packets. It is the operating system’s role to assemble them in the correct order and give the illusion of a stream. Netty drops this abstraction and works at a byte-sequence layer, not a stream. Whenever a few bytes arrive to our application, Netty will notify our handler. Whenever we send few bytes, we get a ChannelFuture without blocking (more on futures in a second).

Our example of non-blocking HTTP server has three components. The first simply starts the server and sets up the environment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class HttpTcpNettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
new ServerBootstrap()
.option(ChannelOption.SO_BACKLOG, 50_000)
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpInitializer())
.bind(8080)
.sync()
.channel()
.closeFuture()
.sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

This is the most basic HTTP server in Netty. The crucial part is bossGroup pool responsible for accepting incoming connections and workerGroup that processes events. These pools are not very big: one for bossGroup and close to the number of CPU cores for workerGroup but this is more than enough for a well-written Netty server. We did not specify yet what the server should do, apart from listening on port 8080. This is configurable via ChannelInitializer:

1
2
3
4
5
6
7
8
9
10
11
12
class HttpInitializer extends ChannelInitializer<SocketChannel> {
private final HttpHandler httpHandler = new HttpHandler();
@Override
public void initChannel(SocketChannel channel) {
channel
.pipeline()
.addLast(new HttpServerCodec())
.addLast(httpHandler);
}
}

Rather than providing a single function that handles the connection, we build a pipeline that processes incoming ByteBuf instances as they arrive. The first step of the pipeline decodes raw incoming bytes into higher-level HTTP request objects. This handler is built-in. It is also used for encoding the HTTP response back to raw bytes. In more robust applications you will often see more handlers focused on smaller functionality; for example, frame decoding, protocol decoding, security, and so on. Every piece of data and notification flows through this pipeline.

The second step of our pipeline is the business logic component that actually handles the request rather than just intercepting or enriching it. Although HttpServerCodec is inherently stateful (it translates incoming packets to high-level HttpRequest instances), our custom HttpHandler can be a stateless singleton:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Sharable
class HttpHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
sendResponse(ctx);
}
}
private void sendResponse(ChannelHandlerContext ctx) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer("OK".getBytes(UTF_8)));
response.headers().add("Content-length", 2);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Error", cause);
ctx.close();
}
}

After constructing the response object, we write() back a DefaultFullHttpResponse. However, write() does not block like in ordinary sockets. Instead, it returns a ChannelFuture that we can subscribe via addListener() and asynchronously close the channel:

1
2
3
ctx
.writeAndFlush(response)
.addListener(ChannelFutureListener.CLOSE);

Channel is an abstraction over a communication link — for example, an HTTP connection — therefore closing a channel closes the connection. Again, we do not want to do this in order to implement persistent connections.

Netty uses just a handful of threads to process possibly thousands of connections. We do not keep any heavyweight data structures or threads per each connection. This is much closer to what actually happens close to the metal. The computer receives an IP packet and wakes up process listening on the destination port.

TCP/IP connections are just an abstraction often implemented using threads. However, when the application is much more demanding in terms of load and the number of connections, operating directly at the packet level is much more robust. We still have channels (lightweight representation of threads) and pipelines with possibly stateful handlers.

OBSERVABLE SERVER WITH RXNETTY

Netty is an important backbone behind plenty of successful products and frameworks such as Akka, Elasticsearch, HornetQ, Play framework, Ratpack and Vert.x to name a few. There is also a thin wrapper around Netty that bridges between its API and RxJava. Let’s rewrite the nonblocking Netty server into RxNetty. But we will begin with asimple currency server to become familiar with the API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class EurUsdCurrencyTcpServer {
private static final BigDecimal RATE = new BigDecimal("1.06448");
public static void main(final String[] args) {
TcpServer
.newServer(8080)
.<String, String>pipelineConfigurator(pipeline -> {
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringDecoder(UTF_8));
})
.start(connection -> {
Observable<String> output = connection
.getInput()
.map(BigDecimal::new)
.flatMap(eur -> eurToUsd(eur));
return connection.writeAndFlushOnEach(output);
})
.awaitShutdown();
}
static Observable<String> eurToUsd(BigDecimal eur) {
return Observable
.just(eur.multiply(RATE))
.map(amount -> eur + " EUR is " + amount + " USD\n")
.delay(1, TimeUnit.SECONDS);
}
}

This is a self-sufficient, standalone TCP/IP server written on top of RxNetty. You should have a rough understanding of its major parts. First, we create a new TCP/IP server listening on port 8080. Netty provides rather low-level abstraction of ByteBuf messages flowing through a pipeline. We must configure such a pipeline, as well. The first handler rearranges (splits and joins when needed) ByteBuf sequences into sequences of lines using built-in LineBasedFrameDecoder. Second, the decoder transforms a ByteBuf containing full lines into actual String instances. From this point, we are working exclusively with Strings.

Every time a new connection arrives, the callback is executed. The connection object allows us to asynchronously send and receive data. First, we begin with connection.getInput(). This object is of type Observable<String> and emits a value every time a new line of the client’s request appears on the server. The getInput() Observable notifies us asynchronously about new input. First, we parse the String into BigDecimal. Then, using the helper method eurToUsd(), we fake calling some currency exchange service. To make the example more realistic, we artificially applied delay() so that we must wait a little bit for the response. Obviously delay() is asynchronous and does not involve any sleeping. In the meantime, we keep receiving and transforming requests along the way.

After all these transformations the output Observable is fed directly into writeAndFlushOnEach(). I believe this is quite understandable—we receive a sequence of inputs, transform them, and use the transformed sequence as a sequence of outputs. Now, let’s interact with this server using telnet. Notice how some responses appear after several requests were consumed due to faked currency server latency:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
2.5
2.5 EUR is 2.661200 USD
0.99
0.99 EUR is 1.0538352 USD
0.94
0.94 EUR is 1.0006112 USD
20
30
40
20 EUR is 21.28960 USD
30 EUR is 31.93440 USD
40 EUR is 42.57920 USD

We treat our server like a function of request data into response data. Because the TCP/IP connection is not just a simple function but a stream of sometimes interdependent chunks of data, RxJava works amazingly well in this scenario. A rich set of operators makes it easy to transform input to output in nontrivial ways. Of course, the output stream does not have to be based on input; for example, if you are implementing server-sent events, the server simply publishes data irrespective of incoming data.

The EurUsdCurrencyTcpServer is reactive because it only acts when data comes in. We do not have a dedicated thread per each client. This implementation can easily withstand thousands of concurrent connections, and vertical scalability is limited only by the amount of traffic it must handle, not the number of more-or-less idle connections.

Knowing how RxNetty works in principle, we can go back to the original HTTP server that returns OK responses. RxNetty has built-in support for HTTP clients and servers, but we will begin from a plain implementation based on TCP/IP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class HttpTcpRxNettyServer {
public static final Observable<String> RESPONSE = Observable.just(
"HTTP/1.1 200 OK\r\n" +
"Content-length: 2\r\n" +
"\r\n" +
"OK");
public static void main(final String[] args) {
TcpServer
.newServer(8080)
.<String, String>pipelineConfigurator(pipeline -> {
pipeline.addLast(new LineBasedFrameDecoder(128));
pipeline.addLast(new StringDecoder(UTF_8));
})
.start(connection -> {
Observable<String> output = connection
.getInput()
.flatMap(line -> {
if (line.isEmpty()) {
return RESPONSE;
} else {
return Observable.empty();
}
});
return connection.writeAndFlushOnEach(output);
})
.awaitShutdown();
}
}

Having EurUsdCurrencyTcpServer in mind understanding HttpTcpRxNettyServer should be fairly simple. Because for educational purposes we are always returning static 200 OK responses, there is no point in parsing the request. However, a well-behaving server should not send a response before it read a request. Therefore, we begin by looking for an empty line in getInput(), marking the end of the HTTP request. Only then do we produce the 200 OK line. The output Observable built this way is passed to connection.writeString(). In other words, the response will be sent to the client as soon as the request contains the first empty line.

Implementing an HTTP server using TCP/IP is an entertaining exercise that helps you to understand the intricacies of HTTP. Luckily, we are not forced to implement HTTP and RESTful web services using TCP/IP abstraction all the time. Similar to Netty, RxNetty also has a bunch of built-in components to serve HTTP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class RxNettyHttpServer {
private static final Observable<String> RESPONSE_OK =
Observable.just("OK");
public static void main(String[] args) {
HttpServer
.newServer(8086)
.start((req, resp) ->
resp
.setHeader(CONTENT_LENGTH, 2)
.writeStringAndFlushOnEach(RESPONSE_OK)
).awaitShutdown();
}
}

If you are bored with just returning a static 200 OK, we can build nonblocking RESTful web service with relative ease, again for currency exchange:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class RestCurrencyServer {
private static final BigDecimal RATE = new BigDecimal("1.06448");
public static void main(final String[] args) {
HttpServer
.newServer(8080)
.start((req, resp) -> {
String amountStr = req.getDecodedPath().substring(1);
BigDecimal amount = new BigDecimal(amountStr);
Observable<String> response = Observable
.just(amount)
.map(eur -> eur.multiply(RATE))
.map(usd ->
"{\"EUR\": " + amount + ", " +
"\"USD\": " + usd + "}");
return resp.writeString(response);
})
.awaitShutdown();
}
}

We can interact with this server using a web browser or curl. The initial substring(1) is required to strip the first slash from the request:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ curl -v localhost:8080/10.99
> GET /10.99 HTTP/1.1
> User-Agent: curl/7.35.0
> Host: localhost:8080
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
<
{"EUR": 10.99, "USD": 11.6986352}

Having a handful of implementations of this simple HTTP server we can compare them in terms of performance, scalability, and throughput. This is the reason why we abandoned the familiar thread-based model and began using RxJava and asynchronous APIs in the first place.

Benchmarking Blocking versus Reactive Server

To illustrate why writing nonblocking, reactive HTTP server is valuable and pays off, we will run a series of benchmarks for each implementation. Interestingly, the benchmarking tool wrk of our choice is also nonblocking; otherwise, it would fail to simulate the load equivalent to tens of thousands of concurrent connections. Another interesting alternative is Gatling, which is built on top of the Akka toolkit. Traditional thread-based load tools like JMeter and ab fail to simulate such excessive load and become a bottleneck themselves.

Every JVM-based implementation was benchmarked against 10,000, 20,000, and 50,000 concurrent HTTP clients, thus TCP/IP connections. We were interested in the number of requests per second (throughput) as well as median and 99th percentile response time. Just as a reminder: median means 50% of the requests were as fast, whereas 99th percentile tells us that 1% of the requests was slower than given number.

PLAIN SERVER RETURNING 200 OKS

The first benchmark compares how various implementations are performing when they simply return 200 OKs and perform no backend tasks. This is a somewhat unrealistic benchmark but it will give us a notion of the server and Ethernet upper limits. In subsequent tests we will add some arbitrary sleep inside every server.

PLAIN SERVER RETURNING 200 OKS

Keep in mind that this benchmark is just a warm-up before real scenarios involving some work on the server side. But we can already see a few interesting trends:

  • Netty and RxNetty-based implementations using raw TCP/IP have the best throughput, almost reaching 200,000 requests per second
  • Unsurprisingly, SingleThread implementation is significantly slower, being able to handle about 6,000 requests per second, irrespective of the concurrency level
  • However, SingleThread is the fastest implementation when there is just one client. The overhead of thread pools, event-driven (Rx)Netty, and pretty much any other implementation is visible. This advantage quickly diminishes when the number of clients grow. Moreover throughput of the server is highly dependent on the performance of the client
  • Surprisingly, ThreadPool performs really well, but it becomes unstable under high load (lots of errors reported by wrk) and fails entirely when confronted with 50,000 concurrent clients (10-second timeout reached)
  • ThreadPerConnection is also performing very well, but above 100–200 threads, the server quickly drops throughput. Also 50,000 threads put a lot of pressure on the JVM, especially a few extra gigabytes of stack space is troublesome

SIMULATING SERVER-SIDE WORK

To simulate some work on the server-side, we will simply inject sleep() invocation in between request and response. This is fair: often servers are not performing any CPU-intensive work to fulfill a request. Traditional servers block on external resources, consuming one thread. Reactive servers, on the other hand, simply wait for an external signal (like event or message containing response), releasing underlying resources in the meantime.

For that reason, for blocking implementations we simply added sleep(), whereas for nonblocking servers we will use Observable.delay() and similar to simulate non-blocking, slow response of some external service, as demonstrated in the following example

1
2
3
4
5
6
public static final Observable<String> RESPONSE = Observable.just(
"HTTP/1.1 200 OK\r\n" +
"Content-length: 2\r\n" +
"\r\n" +
"OK")
.delay(100, MILLISECONDS);

There was no point in using a nonblocking delay in blocking implementations because they would still have to wait for the response, even if the underlying implementation was nonblocking. That being said we injected a 100-millisecond delay to each request so that each interaction with the server takes at least a tenth of a second. The benchmark is now much more realistic and interesting. The number of requests per second versus client connections is shown in the following graphic:

SIMULATING SERVER-SIDE WORK

The results more closely follow what one could expect from a real life load. The two Netty-based implementations (HttpTcpNettyServer and HttpTcpRxNettyServer) on the top are by far the fastest, easily reaching 90,000 requests per second (RPS). As a matter of fact, up until about 10,000 concurrent clients, the server scales linearly. It is very simple to prove: one client generates about 10 RPS (each request takes around 100 milliseconds, so 10 requests fit in 1 second). Two clients generate up to 20 RPS, 5 clients up to 50 RPS, and so on. At about 10,000 concurrent connections we should expect 100,000 RPS and we are close to that theoretical limit (90,000 RPS).

On the bottom, we see the SingleThread and ThreadPool servers. Their performance results are miserable, which does not come as a surprise. Having one thread processing requests, each request taking at least 100 milliseconds clearly cannot handle more than 10 RPS. ThreadPool is much better, having 100 threads, each processing 10 RPS, totaling at 1,000 RPS. These results are worse by a few orders of magnitude compared to reactive Netty and RxJava implementations.

Also, the SingleThread implementation was rejecting almost every request under high load. At around 50,000 concurrent connections, it was accepting a marginal number of requests but almost never met the 10-second timeout imposed by wrk.

You might ask, why restrict ThreadPool to just 100 threads? This number is similar to what popular HTTP servlet containers are defaulting to, but surely we can specify more. Because all connections are persistent and keep thread from a pool for the duration of the entire connection, you can treat ThreadPerConnection like a thread pool with an unlimited number of threads. Surprisingly, such an implementation works quite well, even when JVM must manage 50,000 concurrent threads, each representing one connection.

As a matter of fact, ThreadPerConnection is not much worse than RxNettyHttpServer. It turns out that throughput measured in RPS is not sufficient, we must also look at the response times for each individual request. It depends on your requirements but typically you need both great throughput to utilize the server and low response times so that perceived performance is great, as well.

Average response time is rarely a good indicator. On one hand, average hides outliers (the few requests that are unacceptably slow), on the other, typical response time (those observed by most clients) is smaller compared to average, again due to outliers. Percentiles proved to be much more indicative, effectively describing the distribution of a particular value. The following diagram shows 99th percentile of response time for each implementation versus the number of concurrent connections (or clients). The value on the Y axis tells us that 99% of the requests were faster than a given value. Obviously, we want these numbers to be as low as possible (but they cannot be lower than 100 milliseconds of simulated delay) and grow as little as possible with increasing load, as depicted in the following chart:

Typical response time

ThreadPerConnection implementation badly stands out. Up to 1,000 concurrent connections of all implementations go side by side. But at some point ThreadPerConnection becomes really slow to respond, several times slower than competitors. There are a couple of primary reasons for that: first, excessive context switches between thousands of threads, and second, more frequent garbage collection cycles. Basically JVM spends a lot of time housekeeping and not much is left for actual work. Thousands of concurrent connections are sitting idle, waiting for their turn.

You might be surprised why the ThreadPool implementation has such an outstanding 99th percentile of response time? It outperforms all other implementations and remains stable even under high load. Let’s quickly recap what the implementation of ThreadPool looked like:

1
2
3
4
5
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(1000);
executor = new ThreadPoolExecutor(100, 100, 0L, MILLISECONDS, workQueue,
(r, ex) -> {
((ClientConnection) r).serviceUnavailable();
});

Rather than using the Executors builder, we built ThreadPoolExecutor directly, taking control of workQueue and RejectedExecutionHandler. The latter is executed when the former runs out of space. Basically, we prevent server overload, ensuring that requests that cannot be served quickly are rejected immediately. No other implementation has a similar safety feature, often called fail-fast.

Error rate

Errors, as reported by the wrk load test tool are nonexistent to marginal for all of the implementations except SingleThread and ThreadPool. This is an interesting trade-off: ThreadPool always responds as soon as possible, much faster than competitors. However, it is also very eager to reject requests immediately when it is being overwhelmed. Of course, you can implement a similar mechanism on top of a reactive implementation with Netty/RxJava.

Reactive HTTP Servers Tour

TCP/IP and HTTP built on top of it are inherently event driven. Despite providing an illusion of input and output pipes, underneath one can see asynchronous packets of data arriving asynchronously. As with nearly every abstraction in computer science, treating the network stack as a blocking stream of bytes becomes leaky. Especially when we want to take the full advantage of the hardware.

Classic approaches to networking are just fine, even under moderate load. But to scale up to the limits unheard of in traditional Java applications, you must go reactive. Although Netty is a wonderful framework for building reactive, event-driven network applications, it is rarely used directly. Instead, it is part of a wide range of libraries and frameworks, including RxNetty. RxNetty is especially interesting because it combines the power of event-driven networking with the simplicity of RxJava’s operators. We still treat network communication as a flow of messages (packets) but abstracted away with Observable<ByteBuf>.

Remember how we defined the problem of 10,000 concurrent connections in “Beating the C10k Problem”? We managed to solve this problem using numerous Netty and RxNetty implementations. As a matter of fact, we successfully implemented servers that withstood C50k, handling 50,000 concurrent HTTP persistent connections. With more client hardware (because the server was doing just fine) and less frequent requests going through the wire, the same implementations could easily survive C100k and more—using about dozen lines of code.

Obviously, implementing the server part of HTTP (or any other protocol for that matter; HTTP was chosen due to its ubiquity) is just one side of the story. It is equally important what the server is doing, and most of the time it becomes a client of another server. So far in this chapter, we focused on reactive, nonblocking HTTP servers. This is reasonable, but there are multiple, sometimes surprising places where blocking code can sneak in. First of all we paid a lot of attention to the server side, whereas we entirely skipped the client part.

But modern servers, especially in large distributed systems, serve the role of client, as well, requesting and pushing data to many downstream services. It is safe to assume that a single request to a popular search engine can span hundreds or even thousands of downstream components, thus making plenty of client requests. It is obvious that if these requests were blocking and sequential, the search engine’s response time would be unbearably slow.

No matter how perfectly we implement the server’s infrastructure code, if it still has to deal with blocking APIs, the scalability will be harmed, just like our benchmarks have shown. There are a few known sources of blocking in the Java ecosystem in particular, which we will explore briefly.

HTTP Client Code

Servers that simply make several requests to downstream services and combine the responses together are not unheard of. In fact, you can probably find dozens of startups that managed to cleverly mash up several available data sources and provide a valuable service simply on top of that. Today’s APIs are mostly RESTful with SOAP playing a diminishing role—but both based on the ever-prevalent HTTP.

Even a single blocking request can bring a server down, significantly degrading performance. Fortunately, there is a wide range of mature HTTP clients that are nonblocking and we already met the first one: Netty. There are two classes of problems that a nonblocking HTTP client tries to solve:

  • Large number of independent concurrent requests, each requiring a few client calls to third-party APIs. This is typical for service-oriented architectures for which one request spans multiple services
  • A server is making a large number of HTTP client requests, probably during batch operations. Think about web crawlers or indexing services that constantly keep thousands of connections open

Regardless of the nature of the server, the problem remains the same: maintaining large (tens of thousands and more) open HTTP connections introduces significant overhead. This is especially painful when services we connect to (this time as a client) are slow, therefore requiring us to hold the resources for a long time.

In contrast, a TCP/IP connection is actually quite lightweight. The operating system must keep a socket descriptor for each open connection (around one kilobyte) and that is pretty much it. When a packet (message) arrives, the kernel dispatches it to the appropriate process, like JVM. One kilobyte is a quite a small memory footprint compared to the roughly one megabyte consumed by the stack of each thread blocked on a socket. That being said, the classic thread per connection model does not scale in the case of high-performance servers, and we need to embrace the underlying networking model rather than trying to abstract it using blocking code. The good news is that RxJava + Netty provide a much better abstraction, still relatively close to the metal.

Nonblocking HTTP Client with RxNetty

RxJava together with Netty provides an abstraction that is sufficiently close to the way networks work. Rather than pretending that an HTTP request is almost like an ordinary method call within JVM, it embraces asynchrony.

Moreover, we can no longer pretend that HTTP is just a request–response protocol. The emergence of server-sent events (one request, multiple responses), WebSockets (full-duplex communication), and finally HTTP/2 (many parallel requests and responses over the same wire, interleaving with one another) reveals many different usage scenarios for HTTP.

RxNetty on the client side provides quite a concise API for the simplest use cases. You make a request and get back easily composable Observables:

1
2
3
4
5
6
7
Observable<ByteBuf> response = HttpClient
.newClient("example.com", 80)
.createGet("/")
.flatMap(HttpClientResponse::getContent);
response
.map(bb -> bb.toString(UTF_8))
.subscribe(System.out::println);

Calling the createGet() method returns a subclass of Observable<HttpClientResponse>. Obviously, the client does not block waiting for the response, so the Observable seems like a good choice. But this is just the beginning. HttpClientResponse itself has a getContent() method that returns Observable<ByteByf>.

If you recall from “Nonblocking HTTP Server with Netty and RxNetty”, ByteBuf is an abstraction over a chunk of data received over the wire. From the client perspective, this is part of the response. That is right, RxNetty goes a bit further compared to other nonblocking HTTP clients and does not simply notify us when the entire response arrives. Instead, we get a stream of ByteBuf messages, optionally followed by Observable completion when the server decides to drop the connection.

Such a model is much closer to how the TCP/IP stack works and scales better in terms of use cases. It can work with a simple request/response flow as well as complex streaming scenarios. But beware that even in case of a single response — for example, one containing HTML — it will most likely arrive in multiple chunks.

Of course, RxJava has plenty of ways to assemble them back such as Observable.toList() or Observable.reduce(). But it is your choice: if you want to consume data as it comes in small bits, that is absolutely fine. In that regard, RxNetty is quite low-level, but because the abstraction does not impose a major performance bottleneck like excessive buffering or blocking, it turns out to be extremely scalable. If you are looking for a reactive and robust but more high-level HTTP client, see “Retrofit with Native RxJava Support”.

As opposed to callback-based reactive APIs, RxNetty plays very nicely with other Observables, you can easily parallelize, combine, and split work. For example, imagine that you have a stream of URLs to which you must connect and consume data in real time. This stream can be fixed (built from a simple List<URL>) or dynamic, with new URLs appearing all the time. If you want a steady stream of packets flowing through all of these sources, you can simply flatMap() over them:

1
2
3
4
5
6
7
8
Observable<URL> sources = //...
Observable<ByteBuf> packets =
sources
.flatMap(url -> HttpClient
.newClient(url.getHost(), url.getPort())
.createGet(url.getPath()))
.flatMap(HttpClientResponse::getContent);

This is a slightly contrived example because it mixes together ByteBuf messages from different sources, but you get the idea. For each URL in the upstream Observable, we produce an asynchronous stream of ByteBuf instances from that URL. If you want to first transform incoming data, perhaps by combining chunks of data into a single event — you can do this easily, for example with reduce().

Here is the upshot: you can easily have tens of thousands of open HTTP connections, idle or receiving data. The limiting factor is no longer memory, but the processing power of your CPU and network bandwidth. JVM does not need to consume gigabytes of memory to process a reasonable number of transactions.

HTTP APIs are one of the major bottlenecks in modern applications. They are not expensive in terms of CPU, but blocking HTTP behaving like an ordinary procedural call substantially limits scalability. Even if you carefully remove the blocking HTTP, communication, synchronous code can appear in the most surprising places.

It is a pitfall of the equals() method in java.net.URL that it makes a network request. That’s right: when you compare two instances of the URL class, this seemingly fast method makes a network roundtrip (call sequence, read top to bottom):

1
2
3
4
5
6
7
8
9
10
11
java.net.URL.equals(URL.java)
java.net.URLStreamHandler.equals(URLStreamHandler.java)
java.net.URLStreamHandler.sameFile(URLStreamHandler.java)
java.net.URLStreamHandler.hostsEqual(URLStreamHandler.java)
java.net.URLStreamHandler.getHostAddress(URLStreamHandler.java)
java.net.InetAddress.getByName(InetAddress.java)
java.net.InetAddress.getAllByName(InetAddress.java)
java.net.InetAddress.getAllByName0(InetAddress.java)
java.net.InetAddress.getAddressesFromNameService(InetAddress.java)
java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java)
[native code]

To determine whether two URLs are equal, JVM calls lookupAllHostAddr(), which (in native code) calls gethostbyname (or similar), which can make a synchronous request to DNS server. This can have a disastrous effect when you have just a handful of threads and a few of them are unexpectedly blocked. Remember our RxNetty-based servers? They were using a few dozen threads at most. Another disastrous sitation can happen when URL.equals() is invoked frequently such as in Set<URL>. This unexpected behavior of URL is rather well known, just like the fact that its equals() can actually yield different results depending on Internet connectivity.

Relational Database Access

In previous parts, we concluded that every server eventually becomes a client of some different service. Another interesting observation is that pretty much every computer system we’ve had a chance to work with was distributed. When two machines separated by a network cable need to communicate with each other, they are already spatially distributed. Taking that to the extreme, you might even consider every computer as a distributed system, with independent CPU core’s caches that are not always consistent and must synchronize with one another via message-passing protocol. But let’s stick to application server versus database server architecture.

The long-existing standard for relational database access in Java is called Java Database Connectivity (JDBC). From the consumer perspective, JDBC provides a set of APIs for communicating with any relational database like PostgreSQL, Oracle Database, and many others. The core abstractions are Connection (TCP/IP, wire connection), Statement (a database query), and ResultSet (view over the database result).

Today, developers very rarely use this API directly because more user-friendly abstractions exist, from lightweight JdbcTemplate in Spring framework, through code generation libraries like jOOQ, to object-relational mapping solutions like JPA. JDBC has a notorious reputation for difficult error handling combined with checked exceptions (much simpler with try-with-resources since Java 7):

1
2
3
4
5
6
7
8
9
10
try (
Connection conn = DriverManager.getConnection("jdbc:h2:mem:");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("SELECT 2 + 2 AS total")
) {
if (rs.next()) {
System.out.println(rs.getInt("total"));
assert rs.getInt("total") == 4;
}
}

The preceding example uses an embedded H2 database, often utilized during integration tests. But in production, you rarely see a database instance running on the same machine as the application. Every interaction with the database requires a network roundtrip. The core part of JDBC is the API, which every database vendor must implement.

When asking the JDBC API for a new Connection, the implementation must make a physical connection to the database by opening a client socket, authorizing, and so on. Databases have different wire protocols (almost universally binary) and the responsibility of the JDBC implementation (also known as Driver) is to translate this low-level network protocol into a consistent API.

This works quite well (putting aside different SQL dialects), unfortunately when the JDBC standard was released with JDK 1.1 around 1997, nobody predicted how important reactive and asynchronous programming would be two decades later. Surely, the API went through many versions, but all of them are inherently blocking, waiting for each database operation to complete.

This is precisely the same problem as we had with HTTP. You must have as many threads in your application as active database operations (queries). JDBC is the only mature standard for accessing the variety of relational databases in a portable way (again, SQL dialects differences put aside). The servlet specification was significantly revamped in version 3.0 by introducing the HttpServletRequest.startAsync() method several years ago. It’s too bad that the JDBC standard still holds the classic model.

There are reasons for JDBC to remain blocking. Web servers can easily handle hundreds of thousands of open connections; for example, if they just occasionally stream small bits of data. Database systems, on the other hand, perform several more or less similar steps for each client query:

  1. Query parsing (CPU-bound) translates a String containing a query into a parse tree
  2. Query optimizer (CPU-bound) evaluates the query against various rules and statistics, trying to build an execution plan
  3. Query executor (I/O-bound) traverses database storage and finds appropriate tuples to return
  4. Result set (network-bound) is serialized and pushed back to the client

Clearly, every database needs a lot of resources to perform a query. Typically, the majority of time is actually spent executing the query and disks (spinning or SSD) are not very parallel by design. Therefore, there is a limited amount of concurrent queries that a database system can and should perform until it saturates. This limit largely depends on the actual database engine being used and the hardware on which it’s running.

There are also many other less-obvious aspects like locks, context switches, and CPU cache lines exhaustion. You should expect around a few hundred queries per second. This is very little compared to, for example, the hundreds of thousands of open HTTP connections, easily achievable with nonblocking APIs.

Knowing that throughput of the database is severely limited by hardware, having fully and entirely reactive drivers does not make that much sense after all. Technically, you can implement a wire protocol on top of Netty or RxNetty and never block the client thread. But knowing that JVM can handle hundreds to thousands of threads without much hassle (see “Thread per Connection”), there does not seem to be much benefit derived from rewriting the well-established JDBC API from the ground up. Even Slick from commonly used Lightbend reactive stack powered by Akka toolkit uses JDBC underneath. There are also community-led projects bridging between RxJava and JDBC, such as rxjava-jdbc.

The advice for interacting with relational databases is to actually have a dedicated, well-tuned thread pool and isolate the blocking code there. The rest of your application can be highly reactive and operate on just a handful of threads, but from a pragmatic point of view, just deal with JDBC because trying to replace it with something more reactive could bring a lot of pain for no apparent gain.

NOTIFY AND LISTEN on PostgreSQL Case Study

PostgreSQL has a peculiar built-in messaging mechanism available through the LISTEN and NOTIFY extended SQL statements. Every PostgreSQL client can send a notification to virtual channel via a SQL statement, as shown here:

1
2
NOTIFY my_channel;
NOTIFY my_channel, '{"answer": 42}';

In this example, we send an empty notification followed by an arbitrary string (it can be JSON, XML, or data in any other encoding) to channel named my_channel. A channel is basically a queue managed inside the PostgreSQL database engine. Interestingly, sending a notification is part of a transaction, so delivery happens after the commit, and in the case of rollback, the message is discarded.

To consume notifications from a particular channel, we first must LISTEN on that channel. When we begin listening on a given connection, the only way to obtain notifications is by periodic polling by using the getNotifications() method. This introduces random latency and unnecessary CPU load and context switches; unfortunately, that is how the API was designed. The complete blocking example follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
try (Connection connection =
DriverManager.getConnection("jdbc:postgresql:db")) {
try (Statement statement = connection.createStatement()) {
statement.execute("LISTEN my_channel");
}
Jdbc4Connection pgConn = (Jdbc4Connection) connection;
pollForNotifications(pgConn);
}
}
//...
void pollForNotifications(Jdbc4Connection pgConn) throws Exception {
while (!Thread.currentThread().isInterrupted()) {
final PGNotification[] notifications = pgConn.getNotifications();
if (notifications != null) {
for (final PGNotification notification : notifications) {
System.out.println(
notification.getName() + ": " +
notification.getParameter());
}
}
TimeUnit.MILLISECONDS.sleep(100);
}
}

Not only do we block the client thread, we are also forced to keep one JDBC connection open because listening is tied to a particular connection. At least we can listen on many channels at the same time. The preceding code is quite verbose but straightforward. After calling LISTEN, we enter an endless loop asking for new notifications. Calling getNotifications() is destructive, meaning that it discards returned notifications, so calling it twice will not return the same events. getName() is the channel name (for example, my_channel), whereas getParameter() returns optional event contents such as a JSON payload.

The API is horribly old-fashioned, using null to signal no pending notifications, and arrays rather than collections. Let’s make it more Rx-friendly. In the absence of any push-based mechanism for notifications, we are forced to reimplement polling using the nonblocking interval() operator. There are many tiny details that allow our custom Observable to behave properly, which we will discuss further after the example (which is not yet complete):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable<PGNotification> observe(String channel, long pollingPeriod) {
return Observable.<PGNotification>create(subscriber -> {
try {
Connection connection = DriverManager
.getConnection("jdbc:postgresql:db");
subscriber.add(Subscriptions.create(() -> closeQuietly(connection)));
listenOn(connection, channel);
Jdbc4Connection pgConn = (Jdbc4Connection) connection;
pollForNotifications(pollingPeriod, pgConn)
.subscribe(Subscribers.wrap(subscriber);
} catch (Exception e) {
subscriber.onError(e);
}
}).share();
}
void listenOn(Connection connection, String channel) throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.execute("LISTEN " + channel);
}
}
void closeQuietly(Connection connection) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}

First, we postpone opening a connection to a database until someone actually subscribes. Also, to avoid connection leaks (serious problem in any application that deals with JDBC directly) we ensure that the connection is closed when the Subscriber unsubscribes. Moreover, when an error occurs in the stream, the unsubscription and therefore closing the connection happens.

Now we are ready to call listenOn() and begin receiving notifications over an open connection. If an exception is thrown when this statement is executed, it will be caught and handled by calling subscriber.onError(e). Not only does it seamlessly propagate the error to the subscriber but it also forces the closing of the connection. But if the LISTEN request succeeds, the next invocation of getNotifications() will return all events sent afterward.

We do not want to block any thread so instead, we create an inner Observable with interval() inside pollForNotifications(). We subscribe to that Observable with the same Subscriber but wrapped with Subscribers.wrap() so that onStart() is not executed twice on that Subscriber.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<PGNotification> pollForNotifications(
long pollingPeriod,
AbstractJdbc2Connection pgConn) {
return Observable
.interval(0, pollingPeriod, TimeUnit.MILLISECONDS)
.flatMap(x -> tryGetNotification(pgConn))
.filter(arr -> arr != null)
.flatMapIterable(Arrays::asList);
}
Observable<PGNotification[]> tryGetNotification(
AbstractJdbc2Connection pgConn) {
try {
return Observable.just(pgConn.getNotifications());
} catch (SQLException e) {
return Observable.error(e);
}
}

Periodically, we examine the contents of getNotifications() by first wrapping it in an awkward Observable<PGNotification[]>. Because the returned array PGNotification[] can be null, we then filter() out nulls and via flatMapIterable() unwrap the array, first converting it to a List<PGNotification> with Arrays::asList. The only reason for including closeQuietly() and tryGetNotification() is for handling checked SQLException.

One last tiny bit of implementation is publish() and refCount(), close to the end of the first method. These two methods make it possible to share a single JDBC connection among multiple subscribers. Without them, every new subscriber would open a new connection and listen on it, which is quite wasteful. Additionally refCount() keeps track of the number of subscribers and when the last one unsubscribes it physically closes the database connection.

Remembering that a single connection can listen on multiple channels, as an exercise try to implement observe() so that it reuses the same connection among all subscribers and all channels in which they are interested. The current implementation shares a connection if you call observe() once and subscribe multiple times, whereas it could easily reuse the same connection all the way down, even for subscribers interested in different channels.

There is no practical reason for exploring LISTEN and NOTIFY in PostgreSQL; there are faster, more robust and reliable message queues on the market. But this case study was showing how to exploit JDBC in more reactive scenario, even when it still requires a little bit of blocking or polling.

CompletableFuture and Streams

Java 8, apart from lambda expressions, new java.time API, and multiple smaller additions also brought us CompletableFuture class. This utility significantly improves the Future interface known since Java 5. Pure Future represents asynchronous operation running in the background, typically obtained from ExecutorService. However Future’s API is overly simplistic, forcing developers to block on Future.get() invocation pretty much all the time.

It is not possible to efficiently implement waiting for the first of the Futures to complete without busy waiting. Other options to compose Futures are nonexistent. The following section will briefly describe how CompletableFuture works. Later, we will implement thin interoperability layer between CompletableFuture and Observable.

A Short Introduction to CompletableFuture

CompletableFuture successfully bridges that gap by providing dozens of useful methods, almost all of which are nonblocking and composable. We got used to that map() asynchronously transforms input events on the fly. Moreover, Observable.flatMap() allows us to replace single event with another Observable, chaining asynchronous tasks. A similar operation is possible with CompletableFutures.

Imagine a service that needs two unrelated pieces of information: User and GeoLocation. Knowing both of these, we ask several independent travel agencies to find Flight and we book the Ticket in whichever provider was first to return — promoting the fastest and most reactive one. This last requirement is especially difficult to implement, and prior to Java 8 required ExecutorCompletionService to effectively find the fastest task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
User findById(long id) {
//...
}
GeoLocation locate() {
//...
}
Ticket book(Flight flight) {
//...
}
interface TravelAgency {
Flight search(User user, GeoLocation location);
}

And usage:

1
2
3
4
5
6
7
8
9
10
11
12
13
ExecutorService pool = Executors.newFixedThreadPool(10);
List<TravelAgency> agencies = //...
User user = findById(id);
GeoLocation location = locate();
ExecutorCompletionService<Flight> ecs =
new ExecutorCompletionService<>(pool);
agencies.forEach(agency ->
ecs.submit(() ->
agency.search(user, location)));
Future<Flight> firstFlight = ecs.poll(5, SECONDS);
Flight flight = firstFlight.get();
book(flight);

ExecutorCompletionService was not particularly popular among Java developers, and with CompletableFuture it is no longer needed. But first notice how we wrap ExecutorService with ExecutorCompletionService so that we can later poll for completed tasks as they arrive. With vanilla ExecutorService we would get a bunch of Future objects having no idea which one will complete first, so ExecutorCompletionService was useful. Yet, we still have to sacrifice one extra thread to block waiting for TravelAgencies response. Also, we do not take advantage of concurrency where it is possible (loading User and GeoLocation at the same time).

Our refactoring will turn all methods into their asynchronous counterparts and later combine CompletableFutures appropriately. This way our code is fully nonblocking (main thread completes almost immediately) and we parallelize as much as possible:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CompletableFuture<User> findByIdAsync(long id) {
return CompletableFuture.supplyAsync(() -> findById(id));
}
CompletableFuture<GeoLocation> locateAsync() {
return CompletableFuture.supplyAsync(this::locate);
}
CompletableFuture<Ticket> bookAsync(Flight flight) {
return CompletableFuture.supplyAsync(() -> book(flight));
}
@Override
public CompletableFuture<Flight> searchAsync(User user, GeoLocation location) {
return CompletableFuture.supplyAsync(() -> search(user, location));
}

We simply wrapped blocking methods with an asynchronous CompletableFuture. The supplyAsync() method takes an optional Executor as an argument. If not specified, it uses the globally defined one in ForkJoinPool.commonPool(). It is advised to always use custom Executor, but for the purpose of this sample, we take advantage of the default one. Just keep in mind that the default is shared among all CompletableFutures, parallel streams, and a few other less obvious places.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<TravelAgency> agencies = //...
CompletableFuture<User> user = findByIdAsync(id);
CompletableFuture<GeoLocation> location = locateAsync();
CompletableFuture<Ticket> ticketFuture = user
.thenCombine(location, (User us, GeoLocation loc) -> agencies
.stream()
.map(agency -> agency.searchAsync(us, loc))
.reduce((f1, f2) ->
f1.applyToEither(f2, identity())
)
.get()
)
.thenCompose(identity())
.thenCompose(this::bookAsync);

First, we asynchronously begin fetching User and GeoLocation. These two operations are independent and can run concurrently. However, we need the results of both in order to proceed, of course without blocking and wasting the client thread. This is what thenCombine() is doing — it takes two CompletableFutures (user and location) and invokes a callback when both are completed, asynchronously. Interestingly, the callback can return a value, which will become the new content of resulting CompletableFuture, as demonstrated here:

1
2
3
4
5
6
7
8
9
CompletableFuture<Long> timeFuture = //...
CompletableFuture<ZoneId> zoneFuture = //...
CompletableFuture<Instant> instantFuture = timeFuture
.thenApply(time -> Instant.ofEpochMilli(time));
CompletableFuture<ZonedDateTime> zdtFuture = instantFuture
.thenCombine(zoneFuture, (instant, zoneId) ->
ZonedDateTime.ofInstant(instant, zoneId));

CompletableFuture shares a lot of similarities with Observable. The thenApply() performs on-the-fly transformation of whatever the Future brings, just like Observable.map(). In our example, we transform CompletableFuture<Long> to CompletableFuture<Instant> by supplying a function from Long to Instant (Instant::ofEpochMilli). Later, we take two Futures (instantFuture and zoneFuture) and run a transformation on their future values, namely Instant and ZoneId, using the thenCombine() method. This transformation returns ZoneDateTime, but because most of the CompletableFuture operators are nonblocking, we get CompletableFuture<ZonedDateTime> in return — again, very similar to zip() in Observable.

Going back to the previous example with booking tickets, the following snippet of code is probably quite obscure:

1
2
3
4
5
6
7
8
9
List<TravelAgency> agencies = //...
agencies
.stream()
.map(agency -> agency.searchAsync(us, loc))
.reduce((f1, f2) ->
f1.applyToEither(f2, identity())
)
.get();

We need to start asynchronous operation on each TravelAgency by calling searchAsync(). We immediately get back a List<CompletableFuture<Flight>>; this is a very inconvenient data structure if all we need is the first Future to complete. There are methods like CompletableFuture.allOf() and CompletableFuture.anyOf(). The latter is exactly what we need from a semantic point of view — it takes a group of CompletableFutures and returns a CompletableFuture that completes when the very first underlying CompletableFuture completes, discarding all the others. This is very similar to Observable.amb().

Unfortunately the syntax of anyOf() is very awkward. First, it accepts an array (varargs) and it always returns CompletableFuture<Object>, not of whatever the type that the underlying Futures was such as Flight. We can use it, but it becomes quite messy:

1
2
3
4
5
6
7
8
9
10
11
.thenCombine(location, (User us, GeoLocation loc) -> {
List<CompletableFuture<Flight>> fs = agencies
.stream()
.map(agency -> agency.searchAsync(us, loc))
.collect(toList());
CompletableFuture[] fsArr = new CompletableFuture[fs.size()];
fs.toArray(futuresArr);
return CompletableFuture
.anyOf(futuresArr)
.thenApply(x -> ((Flight) x));
})

The trick with Stream.reduce() is as follows. There exists a CompletableFuture.applyToEither() operator that accepts two CompletableFutures and applies a given transformation on the first one to complete. The applyToEither() transformation is extremely useful when you have two homogeneous tasks and you only care about the first one to complete. In the following example, we query User on two different servers: primary and secondary. Whichever finishes first, we apply a simple transformation that extracts date of the user’s birth. The second CompletableFuture is not interrupted, but its result is discarded. Obviously, we end up with CompletableFuture<LocalDate>:

1
2
3
4
5
6
7
CompletableFuture<User> primaryFuture = //...
CompletableFuture<User> secondaryFuture = //...
CompletableFuture<LocalDate> ageFuture =
primaryFuture
.applyToEither(secondaryFuture,
user -> user.getBirth());

applyToEither() can only work on two CompletableFutures, whereas the quirky anyOf() can take an arbitrary number. Fortunately, we can call applyToEither() on the first two Futures and then take the result (fastest out of the first two) and apply it with the third upstream Future (fastest out of the first three). By iteratively calling applyToEither(), we get the CompletableFuture representing the fastest overall.

This handy trick can be efficiently implemented using the reduce() operator. One last caveat is the identity() method from Function. This is a requirement of applyToEither(); we must provide a transformation that deals with the first result to come. If the result is supposed to be left as-is, we can use an identity function, which can also be written as f -> f or (Flight f) -> f.

Finally, we implemented CompletableFuture<Flight> that completes when the fastest TravelAgency responds, asynchronously. There is still a tiny issue with the result of thenCombine(). Whatever the transformation passed to thenCombine() returns is then wrapped back into CompletableFuture. In our case, we return CompletableFuture<Flight>, so the type of the thenCombine() result is: CompletableFuture<CompletableFuture<Flight>>. Double wrapping is a common issue with Observable as well, and we can use the same trick to fix it in both cases: flatMap()! But remember that just like map() is called thenApply() in Futures, flatMap() is called thenCompose():

1
2
3
4
5
Observable<Observable<String>> badStream = //...
Observable<String> goodStream = badStream.flatMap(x -> x);
CompletableFuture<CompletableFuture<String>> badFuture = //...
CompletableFuture<String> goodFuture = badFuture.thenCompose(x -> x);

Normally, we use flatMap()/thenCompose() to chain an asynchronous computation, but here we simply unwrap the incorrect type. Keep in mind that thenCompose() expects the return type of the supplied transformation to be CompletableFuture. But because the internal type is already a Future, using an identity() function, or simply x -> x, fixes the type by unwrapping the internal Future.

Finally, when we have CompletableFuture<Flight> (abbreviated to flightFuture), we can call bookAsync(), which takes a Flight as an argument:

1
2
CompletableFuture<Ticket> ticketFuture = flightFuture
.thenCompose(flight -> bookAsync(flight));

This time, thenCompose() was used more naturally when calling bookAsync(). That method returns CompletableFuture<Ticket>, so to avoid double wrapping, we choose thenCompose() instead of thenApply().

Interoperability with CompletableFuture

The factory method Observable.from(Future<T>) that returns Observable<T> already exists. However, because of the limitations of the old Future<T> API, it has several shortcomings, the biggest one being blocking on Future.get() internally. Classic Future<T> implementations have no way of registering callbacks and processing them asynchronously, therefore they are quite useless in reactive applications.

CompletableFuture, in contrast, is a totally different story. Semantically, you can treat CompletableFuture like an Observable that has the following characteristics:

  • It is hot The computation behind CompletableFuture starts eagerly, regardless of whether someone registered any callbacks like thenApply() or not
  • It is cached The background computation behind CompletableFuture is triggered once eagerly and the result is forwarded to all registered callbacks. Moreover, if a callback is registered after completion, it is immediately invoked with completed value (or exception)
  • It emits exactly one element or exception In principle, Future<T> completes exactly once (or never) with a value of type T or an exception. This matches the contract of Observable

TURNING COMPLETABLEFUTURE INTO OBSERVABLE WITH SINGLE ITEM

First, we would like to write a utility function that takes a CompletableFuture<T> and returns a properly behaving Observable<T>:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Util {
static <T> Observable<T> observe(CompletableFuture<T> future) {
return Observable.create(subscriber -> {
future.whenComplete((value, exception) -> {
if (exception != null) {
subscriber.onError(exception);
} else {
subscriber.onNext(value);
subscriber.onCompleted();
}
});
});
}
}

To be notified about both successful and failed completion, we use the CompletableFuture.whenComplete() method. It receives two parameters excluding each other. If exception is not null, it means that the underlying Future failed. Otherwise, we take the successful value. In both cases, we notify the incoming subscriber.

Notice that if the subscription appears after the CompletableFuture completed (one way or the other), the callbacks are executed immediately. CompletableFuture caches the result as soon as it completes so that callbacks registered afterward are invoked immediately within the calling thread.

It is tempting to register an unsubscription handler that tries to cancel CompletableFuture in case of unsubscription:

1
2
//Don't do this!
subscriber.add(Subscriptions.create(() -> future.cancel(true)));

This is a bad idea. We can create many Observables based on one CompletableFuture, and every Observable can have multiple Subscribers. If just one Subscriber decides to unsubscribe prior to Future’s completion, cancellation will affect all other Subscribers.

Remember that CompletableFuture is hot and cached using Rx terminology. It begins computation immediately, whereas Observable will not start computation until someone actually subscribes. Having that in mind, with the following tiny utility we can further improve our API:

1
2
3
4
5
6
7
8
9
10
11
Observable<User> rxFindById(long id) {
return Util.observe(findByIdAsync(id));
}
Observable<GeoLocation> rxLocate() {
return Util.observe(locateAsync());
}
Observable<Ticket> rxBook(Flight flight) {
return Util.observe(bookAsync(flight));
}

Obviously, if the API you are consuming supports Observable from the beginning, you do not need all these extra layers of adapters. However, if all you have at your disposal are CompletableFutures, converting them to Observables is efficient and safe. The advantage of RxJava is much more concise implementation of our initial problem:

1
2
3
4
5
6
7
8
9
10
11
12
Observable<TravelAgency> agencies = agencies();
Observable<User> user = rxFindById(id);
Observable<GeoLocation> location = rxLocate();
Observable<Ticket> ticket = user
.zipWith(location, (us, loc) ->
agencies
.flatMap(agency -> agency.rxSearch(us, loc))
.first()
)
.flatMap(x -> x)
.flatMap(this::rxBook);

The client code using RxJava API seems less noisy and easier to read. Rx naturally supports “futures with multiple values” in the form of streams. If you still find identity transformation x -> x inside flatMap() little bit intimidating, we can always split zipWith() using a Pair helper container:

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.commons.lang3.tuple.Pair;
//...
Observable<Ticket> ticket = user
.zipWith(location, (usr, loc) -> Pair.of(usr, loc))
.flatMap(pair -> agencies
.flatMap(agency -> {
User usr = pair.getLeft();
GeoLocation loc = pair.getRight();
return agency.rxSearch(usr, loc);
}))
.first()
.flatMap(this::rxBook);

At this point, you should understand why extra x -> x is no longer needed. zipWith() takes two independent Observables and asynchronously waits for both of them. Java has no built-in pairs and tuples, thus we must provide a transformation that will take the events from both streams and combine them into a single Observable<Pair<User, Location>> object. That object will be the input for the downstream Observable. Later, we use flatMap() to search every travel agency concurrently for a given User and Location. flatMap() does the unwrapping for us (from a syntactic perspective), so the resulting stream is a simple Observable<Flight>. Naturally, in both cases we do first() to process only the first Flight occurring upstream (fastest TravelAgency).

FROM OBSERVABLE TO COMPLETABLEFUTURE

In some cases, the API you are using might support CompletableFuture but not RxJava. Such a situation can be quite common, especially taking into account that the former is part of JDK, whereas the latter is a library. Under these circumstances it would be nice to convert Observable into CompletableFuture. There are two ways to implement this transformation:

Observable<T> to CompletableFuture<T>

Use this when you expect just a single item emitted from stream — for example, when Rx wraps a method invocation or request/response pattern. The CompletableFuture<T> completes successfully when stream completes with exactly one emitted value. Obviously, future completes with an exception when stream completed in such a way or when it did not complete with exactly one item emitted.

Observable<T> to CompletableFuture<List<T>>

In this scenario, the CompletableFuture completes when all events from upstream Observable are emitted and the stream completes. This is just a special case of the first transformation, as you will see later.

You can implement the first scenario easily using the following utility:

1
2
3
4
5
6
7
8
9
10
static <T> CompletableFuture<T> toFuture(Observable<T> observable) {
CompletableFuture<T> promise = new CompletableFuture<>();
observable
.single()
.subscribe(
promise::complete,
promise::completeExceptionally
);
return promise;
}

Before diving into the implementation, keep in mind that this transformation has an important side effect: it subscribes to Observable, thus forcing evaluation and computation of cold Observables. Moreover, each invocation of this transformer will subscribe again; it is just a design choice that you must be aware of.

Apart from that, the implementation is quite interesting. First, we force Observable to emit precisely one element using single(), throwing an exception otherwise. If this single event is emitted and a stream completes, we invoke CompletableFuture.complete(). It turns out one can create CompletableFuture from scratch without any backing thread pool and asynchronous task. It is still a CompletableFuture, but the only way to complete it and signal all registered callbacks is by calling complete() explicitly. This is an efficient way of exchanging data asynchronously, at least when RxJava is not available.

In case of failure, we can trigger an error in all registered callbacks by calling CompletableFuture.completeExceptionally(). As surprising as it is, this is the entire implementation. Future returned from toFuture behaves as if it had some task attached in the background, whereas in reality we explicitly complete it.

The transformation from Observable<T> to CompletableFuture<List<T>> is embarrassingly simple:

1
2
3
static <T> CompletableFuture<List<T>> toFutureList(Observable<T> observable) {
return toFuture(observable.toList());
}

Interoperability between CompletableFuture and Observable is quite useful. The former is properly designed but lacks the expressiveness and richness of the latter. Therefore, if you are forced to deal with CompletableFuture in an otherwise RxJava-based application, apply these simple transformations as soon as possible to provide a consistent and predictable API. Make sure you understand the difference between eager (hot) Future and Observable lazy by default.

Observable versus Single

Observable is a stream, potentially infinite, and all operators are described in terms of streams. But similar to a List<T>, which can have one element, certain Observable<T> can by definition always emit one event. It is quite confusing to have a List<T> to hold exactly one element all the time; therefore, we simply use T or Optional<T> for that. In the land of RxJava there is a special abstraction for Observables emitting exactly one element, and it is called rx.Single<T>.

Single<T> is basically a container for a future value of type T or Exception. In that regard CompletableFuture from Java 8 is the closest cousin of Single. But unlike CompletableFuture, Single is lazy and does not begin producing its value until someone actually subscribes. Single is typically used for APIs known to return a single value (duh!) asynchronously and with high probability of failure.

Obviously, Single is a great candidate for request–response types of communication involving I/O, like a network call. The latency is typically high compared to normal method invocation and failures are inevitable. Moreover, because Single is lazy and asynchronous, we can apply all sorts of tricks to improve latency and resilience, such as invoking independent actions concurrently and combining responses together. Single reduces the confusion of APIs returning Observable by providing a type-level guidance:

1
2
3
Observable<Float> temperature() {
//...
}

It’s difficult to predict the contract of the preceding method. Does it return just one temperature measurement and complete? Or maybe it streams temperatures infinitely? Even worse, it might complete without any events under some circumstances. If temperature() returned Single<Float>, we would have known immediately what output to expect.

Creating and Consuming Single

Single is fairly similar to Observable in terms of the operators it supports, so we will not spend much time on them here. Instead, we will briefly compare them to Observable’s counterparts and focus on use cases for Single. There are few ways to create a Single, beginning with the constant just() and error() operators:

1
2
3
4
5
6
7
8
9
10
11
Single<String> single = Single.just("Hello, world!");
single.subscribe(System.out::println);
Single<Instant> error =
Single.error(new RuntimeException("Opps!"));
error
.observeOn(Schedulers.io())
.subscribe(
System.out::println,
Throwable::printStackTrace
);

There are no overloaded versions of just() that take multiple values—after all, Single can hold only one value by definition. Also the subscribe() method takes two arguments rather than three. There is simply no point in having an onComplete() callback; Single completes with either a value (first callback) or exception (second callback).

Listening on completion alone is equivalent to subscribing for a single value. Additionally, we included the observeOn() operator, which works exactly the same as its Observable peer. The same applies to subscribeOn(). Finally, you can use the error() operator to create a Single that always completes with a given Exception.

Let’s implement a more real-life scenario of making an HTTP request. After making an HTTP request we can provide a callback implementation that will be invoked asynchronously whenever a response or error comes back. This fits very nicely into how Single is created:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
Single<Response> fetch(String address) {
return Single.create(subscriber ->
asyncHttpClient
.prepareGet(address)
.execute(handler(subscriber)));
}
AsyncCompletionHandler handler(SingleSubscriber<? super Response> subscriber) {
return new AsyncCompletionHandler() {
public Response onCompleted(Response response) {
subscriber.onSuccess(response);
return response;
}
public void onThrowable(Throwable t) {
subscriber.onError(t);
}
};
}

Single.create() looks similar to Observable.create() but it has some important constraints; you must call either onSuccess() once or onError() once. Technically, it is also possible to have a Single that never completes, but multiple onSuccess() invocations are not allowed. Speaking of Single.create() you can also try Single.fromCallable() that accepts Callable<T> and returns Single<T>. As simple as that.

Going back to our HTTP client example, when a response is back, we let subscribers know by calling onSuccess() or propagate the exception with onError() in case of asynchronous failure callback. You can use Single in similar fashion to Observable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Single<String> example =
fetch("http://www.example.com")
.flatMap(this::body);
String b = example.toBlocking().value();
//...
Single<String> body(Response response) {
return Single.create(subscriber -> {
try {
subscriber.onSuccess(response.getResponseBody());
} catch (IOException e) {
subscriber.onError(e);
}
});
}
//Same functionality as body():
Single<String> body2(Response response) {
return Single.fromCallable(() ->
response.getResponseBody());
}

Unfortunately, Response.getResponseBody() throws an IOException, so we cannot simply say: map(Response::getResponseBody). But at least we see how Single.flatMap() works. By wrapping the potentially dangerous getResponseBody() method with Single<String>, we make sure potential failure is encapsulated and clearly expressed in type system. Single.flatMap() works as you might expect, knowing Observable.flatMap(): if the second stage of computation (this::body in our case) fails, the entire Single fails, as well.

Interestingly, Single has map() and flatMap() but no filter(). Can you guess why? filter() could potentially filter out content of Single<T> if it did not meet certain Predicate<T>. But Single<T> must have exactly one item inside whereas filter() could result with Single having none.

Just like BlockingObservable, Single has its very own BlockingSingle created with Single.toBlocking(). Analogously, creating BlockingSingle<T> does not yet block. However, calling value() on it blocks until value of type T (the String containing the response body in our example) is available. In case of exception, it will be rethrown from value() method.

Combining Responses Using zip, merge, and concat

rx.Single would be useless if it did not provide composable operators. The most important operator you will come across is Single.zip(), which works the same way as Observable.zip() but has simpler semantics. Single always emits precisely one value (or exception) so the result of Single.zip() (or a Single.zipWith() instance version) is always exactly one pair/tuple. zip() is basically a way of creating a third Single when two underlying Singles complete.

Suppose that you are rendering an article to be displayed on your website. Three independent operations need to be made to fulfill the request: reading the article content from the database, asking a social media website for a number of likes collected so far, and updating the read count metric. Naive implementation not only performs these three actions sequentially, but also risks unacceptable latency if any of the steps are slow. With Single every step is modeled separately:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Single<String> content(int id) {
return Single.fromCallable(() -> jdbcTemplate
.queryForObject(
"SELECT content FROM articles WHERE id = ?",
String.class, id))
.subscribeOn(Schedulers.io());
}
Single<Integer> likes(int id) {
//asynchronous HTTP request to social media website
}
Single<Void> updateReadCount() {
//only side effect, no return value in Single
}

As an example, we show how Single can be created using fromCallable by passing a lambda expression. This utility is quite useful because it manages error handling for us. The content() method uses a handy JdbcTemplate from Spring framework to unobtrusively load the article content from the database. JDBC is inherently blocking the API, so we explicitly call subscribeOn() to make Single asynchronous.

The implementation of likes() and updateReadCount() is omitted. You can imagine likes() making an asynchonous HTTP request to some API using RxNetty. updateReadCount() is interesting because it has a Single<Void> type. This suggest it performs some side effect that has no return value but significant latency.

Yet, we still might want to be notified about possible failures that happened asynchronously. RxJava has a special type for such cases, as well: Completable. This specifies that either complete without result or yield exception asynchronously.

Combining these three operations with zip is quite straightforward:

1
2
3
4
5
6
7
8
9
10
11
12
Single<Document> doc = Single.zip(
content(123),
likes(123),
updateReadCount(),
(con, lks, vod) -> buildHtml(con, lks)
);
//...
Document buildHtml(String content, int likes) {
//...
}

Single.zip() takes three Singles (it has overloaded versions for anything between two and nine instances) and invokes our custom function when all three are completed. The outcome of this custom function is then placed back in a Single<Document> instance that we can further transform. You should be aware that Void result is never used by the transformation. This means that we wait until updateReadCount() completes, yet we do not need its (empty) result. This might be a requirement or can suggest possible optimization: building an HTML document might work just as well if updateReadCount() is executed asynchronously without waiting for its completion or failure.

Now imagine what happens if invoking likes() fails or takes an unacceptably long time to complete (which is actually much worse). Without reactive extensions rendering, HTML fails entirely or takes a considerable amount of time. However, our implementation is not much better in that regard. Single supports multiple operators like timeout(), onErrorReturn(), and onErrorResumeNext() that enhance resiliency and error handling. All of these operators behave the same way as their Observable counterparts.

Interoperability with Observable and CompletableFuture

From the perspective of the type system, Observable and Single are unrelated. This basically means when Observable is required, you cannot use Single and vice versa. However there are two situations for which conversion between the two makes sense:

  • When we use Single as an Observable that emits one value and completion notification (or error notification)
  • When Single is missing certain operators available in Observable, cache() is one such example
1
2
3
4
5
6
7
8
9
10
11
12
Single<String> single = Single.create(subscriber -> {
System.out.println("Subscribing");
subscriber.onSuccess("42");
});
Single<String> cachedSingle = single
.toObservable()
.cache()
.toSingle();
cachedSingle.subscribe(System.out::println);
cachedSingle.subscribe(System.out::println);

We use the cache() operator here so that Single generates “42” only once for the first subscriber. Single.toObservable() is a safe and easy to understand operator. It takes a Single<T> instance and converts it to an Observable<T>, emitting one element immediately followed by a completion notification (or error notification if that is how Single completed).

The opposite Observable.toSingle() requires more attention. Just like single(), toSingle() will throw an exception saying that Observable emitted too many elements” if the underlying Observable emits more than one element. Similarly, expect Observable emitted no items” if Observable is empty:

1
2
3
4
Single<Integer> emptySingle =
Observable.<Integer>empty().toSingle();
Single<Integer> doubleSingle =
Observable.just(1, 2).toSingle();

Now you might think that when the toObservable() and toSingle() operators are used close to each other, the latter is safe, but it does not need to be the case. For example, intermediate Observable might duplicate or discard an event emitted by Single:

1
2
3
4
5
Single<Integer> ignored = Single
.just(1)
.toObservable()
.ignoreElements() //PROBLEM
.toSingle();

In the preceding code, ignoreElements() from Observable simply discards a single value emitted from Single. Therefore, when the toSingle() operator is used, all it can see is an Observable is that the completes without any elements. One thing to keep in mind that toSingle() operator, just like all of the other operators we discovered so far, is lazy. The exception about Single not emitting precisely one event will appear only when someone actually subscribes.

When to Use Single?

Having two abstractions, Observable and Single, it is important to distinguish between them and understand when to use which one. Just like with data structures, one size does not fit all. You should use Single in the following scenarios:

  • An operation must complete with some particular value or an exception. For example, calling a web service always results with either a response from an external server or some sort of exception
  • There is no such thing as a stream in your problem domain; using Observable would be misleading and an overkill
  • Observable is too heavyweight and you measured that Single is faster in your particular problem

On the other hand, you should prefer Observable for these circumstances:

  • You model some sort of events (messages, GUI events) which are by definition occurring several times, possibly infinite
  • Or entirely the opposite, you expect the value to occur or not before completion

The latter case is quite interesting. Do you think it makes sense for findById(int) method on some repository to return Single<Record> rather than Record or Observable<Record>? Well, it sounds reasonable: we look up an item by ID (which suggests there is just one such Record). However, there is no guarantee that a Record exists for every ID we supply. Therefore, this method can technically return nothing, modeled as null, Optional<Record>, or Observable<Record>, which are perfectly capable of handling empty streams followed by a completion notification.

What about Single? It must either complete with a single value (Record) or with an exception. It is your design choice if you want to model a nonexisting record with an exception, but this often is considered a bad practice. Deciding whether a missing record for a given ID is a truly exceptional situation is not a responsibility of repository layer.

Summary

In this chapter, we covered quite advanced topics related to designing entirely reactive applications. This part was much more advanced, showing real-life techniques for implementing event-driven systems without introducing accidental complexity. We have shown several benchmarks proving that RxJava together with a nonblocking networking stack like Netty. You are not forced to use such advanced libraries but it certainly pays off when you strive for maximum throughput on a commodity servers.