Truly reactive applications use streams pretty much from top to bottom. This approach simplifies reasoning and makes our application very consistent. Nonblocking applications tend to provide great performance and throughput for a fraction of the hardware. By limiting the number of threads, we are able to fully utilize CPU without consuming gigabytes of memory.
One of the limiting factors of scalability in Java is the I/O mechanism. The
java.io package is very well designed with lots of small
Reader/Writer implementations that decorate and wrap one another, adding one functionality at a time.
As much as I like this beautiful separation of concerns, standard I/O in Java is entirely blocking, meaning every thread that wishes to read or write from a
File must wait indefinitely for the result. Even worse, threads stuck at an I/O operation due to slow network or an even slower spinning disk are hard to interrupt. Blocking on its own is not an issue, when one thread is blocked, others can still interact with remaining open
Sockets. But threads are expensive to create and manage, and switching between them takes time. Java applications are perfectly capable of handling tens of thousands of concurrent connections, but you must design them carefully. This design effort is greatly reduced when RxJava is combined with some modern event-driven libraries and frameworks.
The C10k problem was an area of research and optimization that tried to achieve 10,000 concurrent connections on a single commodity server. Even these days, solving this engineering task with the traditional Java toolkit is a challenge.
We will go through several examples of a simple HTTP server and observe how it behaves with respect to design choices we made. Admittedly, performance and scalability does have a complexity price tag. But with RxJava the additional complexity will be reduced significantly.
The classic thread per connection model struggles to solve the C10k problem. With 10,000 threads we do the following:
- Consume several gigabytes of RAM for stack space
- Put great pressure on the garbage collection mechanism, despite that stack space is not garbage-collected (lots of GC roots and live objects)
- Waste significant amount of CPU time simply switching cores to run different threads (context switching)
The classic thread-per-Socket model served us really well, and as a matter of fact it works quite good in many applications to this day. However, after you reach certain level of concurrency, the number of threads becomes dangerous. A thousand concurrent connections handled by a single commodity server is not something unusual, especially with long-living TCP/IP connections like HTTP with a
Keep-Alive header, server-sent events, or WebSockets. However, each thread occupies a little bit of memory (stack space), regardless of whether it is computing something or just waiting idle for data.
There are two independent approaches to scalability: horizontal and vertical. To handle more concurrent connections we can simply spin up more servers, each managing a subset of the load. This requires a frontend load-balancer and does not solve the original C10k problem that expects just one server. On the other hand, vertical scalability means purchasing bigger and more capable servers. However, with blocking I/O we need a disproportional amount of memory compared to heavily underutilized CPU.
Even if a big enterprise server can handle hundreds of thousands of concurrent connections (at very high price), it is far from solving C10M problem—ten million concurrent connections. This number is not a coincidence; a couple of years ago, a properly designed Java application reached that enormous level on a typical server.
This chapter takes you on a journey through different ways of implementing an HTTP server. From single-threaded servers, through thread pools, to entirely event-driven architectures. The idea behind this exercise is to compare the implementation complexity versus performance and throughput. In the end, you will notice that the version using RxJava combines both relative simplicity and outstanding performance.
The purpose of this section is to compare how blocking servers, even when written properly, behave under high load. This is the exercise that we probably all went through during our education: writing a server on top of raw sockets. We will be implementing an extremely simple HTTP server that responds with 200 OKs for every request. As a matter of fact, for the sake of simplicity we will ignore the request altogether.
The simplest implementation just opens a
ServerSocket and handles client connections as they come. When a single client is served, all other requests are queued up. The following code snippet is actually very simple:
For each request we ignore whatever was sent to us and return 200 OK responses. Opening
localhost:8080 in the browser succeeds with an OK text reply. The class is named
SingleThread for a reason.
ServerSocket.accept() blocks until any client establishes a connection with us. Then, it returns a client
Socket. While we interact with that
Socket (read and write to it), we still listen for incoming connections but no one picks them up because our thread is busy handling first client.
Now, coming back to our client connection, we first must read the entire request and then write the response. Both of these operations are potentially blocking and subject to network slowness and congestion. If one client establishes a connection but then waits a few seconds before sending a request, all other clients must wait. Having just a single thread for handling all incoming connections is clearly not very scalable, we barely solved the C1 (one concurrent connection) problem.
ThreadPerConnection shows how to implement a blocking server that creates a new thread per each client connection. This presumably scales quite well, but such implementation suffers the same problems as
fork() in C: starting a new thread takes some time and resources, which is especially wasteful for short-lived connections. Moreover, there is no limit to the maximum number of client threads running at the same time. And when you do not put a limit on something in the computer system, this limit will be applied for you in the worst and least expected place. For example, our program will become unstable and eventually crash with
OutOfMemoryError in case of thousands of concurrent connections.
ThreadPool also uses a thread per connection, but threads are recycled when a client disconnects so that we do not pay the price of thread warm up for every client. This is pretty much how all popular servlet containers like Tomcat and Jetty work, managing 100 to 200 threads in a pool by default. Tomcat has the so-called NIO connector that handles some of the operations on sockets asynchronously, but the real work in servlets and frameworks built on top of them is still blocking. This means that traditional applications are inherently limited to a couple thousand connections, even built on top of modern servlet containers.
We will now focus on event-driven approaches to writing an HTTP server, which are far more promising in terms of scalability. A blocking processing model involving thread-per-request clearly does not scale. We need a way of managing several client connections with just a handful of threads. This has a lot of benefits:
- Reduced memory consumption
- Better CPU and CPU cache utilization
- Greatly improved scalability on a single node
One caveat is the lost simplicity and clarity. Threads are not allowed to block on any operation, we can no longer pretend that receiving or sending data over the wire is the same as a local method invocation. The latency is unpredictable and response times higher by orders of magnitude. By the time you read this, there will probably still be quite a few spinning hard drives out there, which are even slower than a local area networks.
In this section, we will develop a tiny event-driven application with the Netty framework and later refactor it to use RxNetty. Finally, we conclude with a benchmark of all solutions.
Netty is entirely event-driven; we never block waiting for data to be sent or received. Instead, raw bytes in the form of ByteBuf instances are pushed to our processing pipeline.
TCP/IP gives us an impression of connection and data flowing byte after byte between two computers. But in reality TCP/IP is built on top of IP, which can barely transfer chunks of data known as packets. It is the operating system’s role to assemble them in the correct order and give the illusion of a stream. Netty drops this abstraction and works at a byte-sequence layer, not a stream. Whenever a few bytes arrive to our application, Netty will notify our handler. Whenever we send few bytes, we get a
ChannelFuture without blocking (more on futures in a second).
Our example of non-blocking HTTP server has three components. The first simply starts the server and sets up the environment:
This is the most basic HTTP server in Netty. The crucial part is
bossGroup pool responsible for accepting incoming connections and
workerGroup that processes events. These pools are not very big: one for bossGroup and close to the number of CPU cores for
workerGroup but this is more than enough for a well-written Netty server. We did not specify yet what the server should do, apart from listening on port 8080. This is configurable via
Rather than providing a single function that handles the connection, we build a pipeline that processes incoming ByteBuf instances as they arrive. The first step of the pipeline decodes raw incoming bytes into higher-level HTTP request objects. This handler is built-in. It is also used for encoding the HTTP response back to raw bytes. In more robust applications you will often see more handlers focused on smaller functionality; for example, frame decoding, protocol decoding, security, and so on. Every piece of data and notification flows through this pipeline.
The second step of our pipeline is the business logic component that actually handles the request rather than just intercepting or enriching it. Although
HttpServerCodec is inherently stateful (it translates incoming packets to high-level
HttpRequest instances), our custom HttpHandler can be a stateless singleton:
After constructing the response object, we
write() back a
write() does not block like in ordinary sockets. Instead, it returns a
ChannelFuture that we can subscribe via
addListener() and asynchronously close the channel:
Channel is an abstraction over a communication link — for example, an HTTP connection — therefore closing a channel closes the connection. Again, we do not want to do this in order to implement persistent connections.
Netty uses just a handful of threads to process possibly thousands of connections. We do not keep any heavyweight data structures or threads per each connection. This is much closer to what actually happens close to the metal. The computer receives an IP packet and wakes up process listening on the destination port.
TCP/IP connections are just an abstraction often implemented using threads. However, when the application is much more demanding in terms of load and the number of connections, operating directly at the packet level is much more robust. We still have channels (lightweight representation of threads) and pipelines with possibly stateful handlers.
Netty is an important backbone behind plenty of successful products and frameworks such as Akka, Elasticsearch, HornetQ, Play framework, Ratpack and Vert.x to name a few. There is also a thin wrapper around Netty that bridges between its API and RxJava. Let’s rewrite the nonblocking Netty server into RxNetty. But we will begin with asimple currency server to become familiar with the API:
This is a self-sufficient, standalone TCP/IP server written on top of RxNetty. You should have a rough understanding of its major parts. First, we create a new TCP/IP server listening on port 8080. Netty provides rather low-level abstraction of
ByteBuf messages flowing through a pipeline. We must configure such a pipeline, as well. The first handler rearranges (splits and joins when needed)
ByteBuf sequences into sequences of lines using built-in
LineBasedFrameDecoder. Second, the decoder transforms a
ByteBuf containing full lines into actual
String instances. From this point, we are working exclusively with
Every time a new connection arrives, the callback is executed. The connection object allows us to asynchronously send and receive data. First, we begin with
connection.getInput(). This object is of type
Observable<String> and emits a value every time a new line of the client’s request appears on the server. The
getInput() Observable notifies us asynchronously about new input. First, we parse the String into
BigDecimal. Then, using the helper method
eurToUsd(), we fake calling some currency exchange service. To make the example more realistic, we artificially applied
delay() so that we must wait a little bit for the response. Obviously
delay() is asynchronous and does not involve any sleeping. In the meantime, we keep receiving and transforming requests along the way.
After all these transformations the output Observable is fed directly into
writeAndFlushOnEach(). I believe this is quite understandable—we receive a sequence of inputs, transform them, and use the transformed sequence as a sequence of outputs. Now, let’s interact with this server using
telnet. Notice how some responses appear after several requests were consumed due to faked currency server latency:
We treat our server like a function of request data into response data. Because the TCP/IP connection is not just a simple function but a stream of sometimes interdependent chunks of data, RxJava works amazingly well in this scenario. A rich set of operators makes it easy to transform input to output in nontrivial ways. Of course, the output stream does not have to be based on input; for example, if you are implementing server-sent events, the server simply publishes data irrespective of incoming data.
EurUsdCurrencyTcpServer is reactive because it only acts when data comes in. We do not have a dedicated thread per each client. This implementation can easily withstand thousands of concurrent connections, and vertical scalability is limited only by the amount of traffic it must handle, not the number of more-or-less idle connections.
Knowing how RxNetty works in principle, we can go back to the original HTTP server that returns OK responses. RxNetty has built-in support for HTTP clients and servers, but we will begin from a plain implementation based on TCP/IP:
EurUsdCurrencyTcpServer in mind understanding
HttpTcpRxNettyServer should be fairly simple. Because for educational purposes we are always returning static 200 OK responses, there is no point in parsing the request. However, a well-behaving server should not send a response before it read a request. Therefore, we begin by looking for an empty line in
getInput(), marking the end of the HTTP request. Only then do we produce the 200 OK line. The output
Observable built this way is passed to
connection.writeString(). In other words, the response will be sent to the client as soon as the request contains the first empty line.
Implementing an HTTP server using TCP/IP is an entertaining exercise that helps you to understand the intricacies of HTTP. Luckily, we are not forced to implement HTTP and RESTful web services using TCP/IP abstraction all the time. Similar to Netty, RxNetty also has a bunch of built-in components to serve HTTP:
If you are bored with just returning a static
200 OK, we can build nonblocking RESTful web service with relative ease, again for currency exchange:
We can interact with this server using a web browser or
curl. The initial
substring(1) is required to strip the first slash from the request:
Having a handful of implementations of this simple HTTP server we can compare them in terms of performance, scalability, and throughput. This is the reason why we abandoned the familiar thread-based model and began using RxJava and asynchronous APIs in the first place.
To illustrate why writing nonblocking, reactive HTTP server is valuable and pays off, we will run a series of benchmarks for each implementation. Interestingly, the benchmarking tool
wrk of our choice is also nonblocking; otherwise, it would fail to simulate the load equivalent to tens of thousands of concurrent connections. Another interesting alternative is Gatling, which is built on top of the Akka toolkit. Traditional thread-based load tools like JMeter and
ab fail to simulate such excessive load and become a bottleneck themselves.
Every JVM-based implementation was benchmarked against 10,000, 20,000, and 50,000 concurrent HTTP clients, thus TCP/IP connections. We were interested in the number of requests per second (throughput) as well as median and 99th percentile response time. Just as a reminder: median means 50% of the requests were as fast, whereas 99th percentile tells us that 1% of the requests was slower than given number.
The first benchmark compares how various implementations are performing when they simply return 200 OKs and perform no backend tasks. This is a somewhat unrealistic benchmark but it will give us a notion of the server and Ethernet upper limits. In subsequent tests we will add some arbitrary sleep inside every server.
Keep in mind that this benchmark is just a warm-up before real scenarios involving some work on the server side. But we can already see a few interesting trends:
- Netty and RxNetty-based implementations using raw TCP/IP have the best throughput, almost reaching 200,000 requests per second
SingleThreadimplementation is significantly slower, being able to handle about 6,000 requests per second, irrespective of the concurrency level
SingleThreadis the fastest implementation when there is just one client. The overhead of thread pools, event-driven (Rx)Netty, and pretty much any other implementation is visible. This advantage quickly diminishes when the number of clients grow. Moreover throughput of the server is highly dependent on the performance of the client
ThreadPoolperforms really well, but it becomes unstable under high load (lots of errors reported by
wrk) and fails entirely when confronted with 50,000 concurrent clients (10-second timeout reached)
ThreadPerConnectionis also performing very well, but above 100–200 threads, the server quickly drops throughput. Also 50,000 threads put a lot of pressure on the JVM, especially a few extra gigabytes of stack space is troublesome
To simulate some work on the server-side, we will simply inject
sleep() invocation in between request and response. This is fair: often servers are not performing any CPU-intensive work to fulfill a request. Traditional servers block on external resources, consuming one thread. Reactive servers, on the other hand, simply wait for an external signal (like event or message containing response), releasing underlying resources in the meantime.
For that reason, for blocking implementations we simply added
sleep(), whereas for nonblocking servers we will use
Observable.delay() and similar to simulate non-blocking, slow response of some external service, as demonstrated in the following example
There was no point in using a nonblocking delay in blocking implementations because they would still have to wait for the response, even if the underlying implementation was nonblocking. That being said we injected a 100-millisecond delay to each request so that each interaction with the server takes at least a tenth of a second. The benchmark is now much more realistic and interesting. The number of requests per second versus client connections is shown in the following graphic:
The results more closely follow what one could expect from a real life load. The two Netty-based implementations (
HttpTcpRxNettyServer) on the top are by far the fastest, easily reaching 90,000 requests per second (RPS). As a matter of fact, up until about 10,000 concurrent clients, the server scales linearly. It is very simple to prove: one client generates about 10 RPS (each request takes around 100 milliseconds, so 10 requests fit in 1 second). Two clients generate up to 20 RPS, 5 clients up to 50 RPS, and so on. At about 10,000 concurrent connections we should expect 100,000 RPS and we are close to that theoretical limit (90,000 RPS).
On the bottom, we see the
ThreadPool servers. Their performance results are miserable, which does not come as a surprise. Having one thread processing requests, each request taking at least 100 milliseconds clearly cannot handle more than 10 RPS.
ThreadPool is much better, having 100 threads, each processing 10 RPS, totaling at 1,000 RPS. These results are worse by a few orders of magnitude compared to reactive Netty and RxJava implementations.
SingleThread implementation was rejecting almost every request under high load. At around 50,000 concurrent connections, it was accepting a marginal number of requests but almost never met the 10-second timeout imposed by
You might ask, why restrict
ThreadPool to just 100 threads? This number is similar to what popular HTTP servlet containers are defaulting to, but surely we can specify more. Because all connections are persistent and keep thread from a pool for the duration of the entire connection, you can treat
ThreadPerConnection like a thread pool with an unlimited number of threads. Surprisingly, such an implementation works quite well, even when JVM must manage 50,000 concurrent threads, each representing one connection.
As a matter of fact,
ThreadPerConnection is not much worse than RxNettyHttpServer. It turns out that throughput measured in RPS is not sufficient, we must also look at the response times for each individual request. It depends on your requirements but typically you need both great throughput to utilize the server and low response times so that perceived performance is great, as well.
Average response time is rarely a good indicator. On one hand, average hides outliers (the few requests that are unacceptably slow), on the other, typical response time (those observed by most clients) is smaller compared to average, again due to outliers. Percentiles proved to be much more indicative, effectively describing the distribution of a particular value. The following diagram shows 99th percentile of response time for each implementation versus the number of concurrent connections (or clients). The value on the Y axis tells us that 99% of the requests were faster than a given value. Obviously, we want these numbers to be as low as possible (but they cannot be lower than 100 milliseconds of simulated delay) and grow as little as possible with increasing load, as depicted in the following chart:
ThreadPerConnection implementation badly stands out. Up to 1,000 concurrent connections of all implementations go side by side. But at some point
ThreadPerConnection becomes really slow to respond, several times slower than competitors. There are a couple of primary reasons for that: first, excessive context switches between thousands of threads, and second, more frequent garbage collection cycles. Basically JVM spends a lot of time housekeeping and not much is left for actual work. Thousands of concurrent connections are sitting idle, waiting for their turn.
You might be surprised why the
ThreadPool implementation has such an outstanding 99th percentile of response time? It outperforms all other implementations and remains stable even under high load. Let’s quickly recap what the implementation of
ThreadPool looked like:
Rather than using the
Executors builder, we built
ThreadPoolExecutor directly, taking control of
RejectedExecutionHandler. The latter is executed when the former runs out of space. Basically, we prevent server overload, ensuring that requests that cannot be served quickly are rejected immediately. No other implementation has a similar safety feature, often called fail-fast.
Errors, as reported by the wrk load test tool are nonexistent to marginal for all of the implementations except
ThreadPool. This is an interesting trade-off:
ThreadPool always responds as soon as possible, much faster than competitors. However, it is also very eager to reject requests immediately when it is being overwhelmed. Of course, you can implement a similar mechanism on top of a reactive implementation with Netty/RxJava.
TCP/IP and HTTP built on top of it are inherently event driven. Despite providing an illusion of input and output pipes, underneath one can see asynchronous packets of data arriving asynchronously. As with nearly every abstraction in computer science, treating the network stack as a blocking stream of bytes becomes leaky. Especially when we want to take the full advantage of the hardware.
Classic approaches to networking are just fine, even under moderate load. But to scale up to the limits unheard of in traditional Java applications, you must go reactive. Although Netty is a wonderful framework for building reactive, event-driven network applications, it is rarely used directly. Instead, it is part of a wide range of libraries and frameworks, including RxNetty. RxNetty is especially interesting because it combines the power of event-driven networking with the simplicity of RxJava’s operators. We still treat network communication as a flow of messages (packets) but abstracted away with
Remember how we defined the problem of 10,000 concurrent connections in “Beating the C10k Problem”? We managed to solve this problem using numerous Netty and RxNetty implementations. As a matter of fact, we successfully implemented servers that withstood C50k, handling 50,000 concurrent HTTP persistent connections. With more client hardware (because the server was doing just fine) and less frequent requests going through the wire, the same implementations could easily survive C100k and more—using about dozen lines of code.
Obviously, implementing the server part of HTTP (or any other protocol for that matter; HTTP was chosen due to its ubiquity) is just one side of the story. It is equally important what the server is doing, and most of the time it becomes a client of another server. So far in this chapter, we focused on reactive, nonblocking HTTP servers. This is reasonable, but there are multiple, sometimes surprising places where blocking code can sneak in. First of all we paid a lot of attention to the server side, whereas we entirely skipped the client part.
But modern servers, especially in large distributed systems, serve the role of client, as well, requesting and pushing data to many downstream services. It is safe to assume that a single request to a popular search engine can span hundreds or even thousands of downstream components, thus making plenty of client requests. It is obvious that if these requests were blocking and sequential, the search engine’s response time would be unbearably slow.
No matter how perfectly we implement the server’s infrastructure code, if it still has to deal with blocking APIs, the scalability will be harmed, just like our benchmarks have shown. There are a few known sources of blocking in the Java ecosystem in particular, which we will explore briefly.
Servers that simply make several requests to downstream services and combine the responses together are not unheard of. In fact, you can probably find dozens of startups that managed to cleverly mash up several available data sources and provide a valuable service simply on top of that. Today’s APIs are mostly RESTful with SOAP playing a diminishing role—but both based on the ever-prevalent HTTP.
Even a single blocking request can bring a server down, significantly degrading performance. Fortunately, there is a wide range of mature HTTP clients that are nonblocking and we already met the first one: Netty. There are two classes of problems that a nonblocking HTTP client tries to solve:
- Large number of independent concurrent requests, each requiring a few client calls to third-party APIs. This is typical for service-oriented architectures for which one request spans multiple services
- A server is making a large number of HTTP client requests, probably during batch operations. Think about web crawlers or indexing services that constantly keep thousands of connections open
Regardless of the nature of the server, the problem remains the same: maintaining large (tens of thousands and more) open HTTP connections introduces significant overhead. This is especially painful when services we connect to (this time as a client) are slow, therefore requiring us to hold the resources for a long time.
In contrast, a TCP/IP connection is actually quite lightweight. The operating system must keep a socket descriptor for each open connection (around one kilobyte) and that is pretty much it. When a packet (message) arrives, the kernel dispatches it to the appropriate process, like JVM. One kilobyte is a quite a small memory footprint compared to the roughly one megabyte consumed by the stack of each thread blocked on a socket. That being said, the classic thread per connection model does not scale in the case of high-performance servers, and we need to embrace the underlying networking model rather than trying to abstract it using blocking code. The good news is that RxJava + Netty provide a much better abstraction, still relatively close to the metal.
RxJava together with Netty provides an abstraction that is sufficiently close to the way networks work. Rather than pretending that an HTTP request is almost like an ordinary method call within JVM, it embraces asynchrony.
Moreover, we can no longer pretend that HTTP is just a request–response protocol. The emergence of server-sent events (one request, multiple responses), WebSockets (full-duplex communication), and finally HTTP/2 (many parallel requests and responses over the same wire, interleaving with one another) reveals many different usage scenarios for HTTP.
RxNetty on the client side provides quite a concise API for the simplest use cases. You make a request and get back easily composable
createGet() method returns a subclass of
Observable<HttpClientResponse>. Obviously, the client does not block waiting for the response, so the
Observable seems like a good choice. But this is just the beginning.
HttpClientResponse itself has a
getContent() method that returns
If you recall from “Nonblocking HTTP Server with Netty and RxNetty”,
ByteBuf is an abstraction over a chunk of data received over the wire. From the client perspective, this is part of the response. That is right, RxNetty goes a bit further compared to other nonblocking HTTP clients and does not simply notify us when the entire response arrives. Instead, we get a stream of
ByteBuf messages, optionally followed by
Observable completion when the server decides to drop the connection.
Of course, RxJava has plenty of ways to assemble them back such as
Observable.reduce(). But it is your choice: if you want to consume data as it comes in small bits, that is absolutely fine. In that regard, RxNetty is quite low-level, but because the abstraction does not impose a major performance bottleneck like excessive buffering or blocking, it turns out to be extremely scalable. If you are looking for a reactive and robust but more high-level HTTP client, see “Retrofit with Native RxJava Support”.
As opposed to callback-based reactive APIs, RxNetty plays very nicely with other
Observables, you can easily parallelize, combine, and split work. For example, imagine that you have a stream of URLs to which you must connect and consume data in real time. This stream can be fixed (built from a simple
List<URL>) or dynamic, with new URLs appearing all the time. If you want a steady stream of packets flowing through all of these sources, you can simply
flatMap() over them:
This is a slightly contrived example because it mixes together
ByteBuf messages from different sources, but you get the idea. For each URL in the upstream
Observable, we produce an asynchronous stream of
ByteBuf instances from that URL. If you want to first transform incoming data, perhaps by combining chunks of data into a single event — you can do this easily, for example with
Here is the upshot: you can easily have tens of thousands of open HTTP connections, idle or receiving data. The limiting factor is no longer memory, but the processing power of your CPU and network bandwidth. JVM does not need to consume gigabytes of memory to process a reasonable number of transactions.
HTTP APIs are one of the major bottlenecks in modern applications. They are not expensive in terms of CPU, but blocking HTTP behaving like an ordinary procedural call substantially limits scalability. Even if you carefully remove the blocking HTTP, communication, synchronous code can appear in the most surprising places.
It is a pitfall of the
equals() method in
java.net.URL that it makes a network request. That’s right: when you compare two instances of the
URL class, this seemingly fast method makes a network roundtrip (call sequence, read top to bottom):
To determine whether two
URLs are equal, JVM calls
lookupAllHostAddr(), which (in native code) calls gethostbyname (or similar), which can make a synchronous request to DNS server. This can have a disastrous effect when you have just a handful of threads and a few of them are unexpectedly blocked. Remember our RxNetty-based servers? They were using a few dozen threads at most. Another disastrous sitation can happen when
URL.equals() is invoked frequently such as in
Set<URL>. This unexpected behavior of
URL is rather well known, just like the fact that its
equals() can actually yield different results depending on Internet connectivity.
In previous parts, we concluded that every server eventually becomes a client of some different service. Another interesting observation is that pretty much every computer system we’ve had a chance to work with was distributed. When two machines separated by a network cable need to communicate with each other, they are already spatially distributed. Taking that to the extreme, you might even consider every computer as a distributed system, with independent CPU core’s caches that are not always consistent and must synchronize with one another via message-passing protocol. But let’s stick to application server versus database server architecture.
The long-existing standard for relational database access in Java is called Java Database Connectivity (JDBC). From the consumer perspective, JDBC provides a set of APIs for communicating with any relational database like PostgreSQL, Oracle Database, and many others. The core abstractions are
Connection (TCP/IP, wire connection),
Statement (a database query), and
ResultSet (view over the database result).
Today, developers very rarely use this API directly because more user-friendly abstractions exist, from lightweight
JdbcTemplate in Spring framework, through code generation libraries like jOOQ, to object-relational mapping solutions like JPA. JDBC has a notorious reputation for difficult error handling combined with checked exceptions (much simpler with try-with-resources since Java 7):
The preceding example uses an embedded H2 database, often utilized during integration tests. But in production, you rarely see a database instance running on the same machine as the application. Every interaction with the database requires a network roundtrip. The core part of JDBC is the API, which every database vendor must implement.
When asking the JDBC API for a new
Connection, the implementation must make a physical connection to the database by opening a client socket, authorizing, and so on. Databases have different wire protocols (almost universally binary) and the responsibility of the JDBC implementation (also known as
Driver) is to translate this low-level network protocol into a consistent API.
This works quite well (putting aside different SQL dialects), unfortunately when the JDBC standard was released with JDK 1.1 around 1997, nobody predicted how important reactive and asynchronous programming would be two decades later. Surely, the API went through many versions, but all of them are inherently blocking, waiting for each database operation to complete.
This is precisely the same problem as we had with HTTP. You must have as many threads in your application as active database operations (queries). JDBC is the only mature standard for accessing the variety of relational databases in a portable way (again, SQL dialects differences put aside). The servlet specification was significantly revamped in version 3.0 by introducing the
HttpServletRequest.startAsync() method several years ago. It’s too bad that the JDBC standard still holds the classic model.
There are reasons for JDBC to remain blocking. Web servers can easily handle hundreds of thousands of open connections; for example, if they just occasionally stream small bits of data. Database systems, on the other hand, perform several more or less similar steps for each client query:
- Query parsing (CPU-bound) translates a String containing a query into a parse tree
- Query optimizer (CPU-bound) evaluates the query against various rules and statistics, trying to build an execution plan
- Query executor (I/O-bound) traverses database storage and finds appropriate tuples to return
- Result set (network-bound) is serialized and pushed back to the client
Clearly, every database needs a lot of resources to perform a query. Typically, the majority of time is actually spent executing the query and disks (spinning or SSD) are not very parallel by design. Therefore, there is a limited amount of concurrent queries that a database system can and should perform until it saturates. This limit largely depends on the actual database engine being used and the hardware on which it’s running.
There are also many other less-obvious aspects like locks, context switches, and CPU cache lines exhaustion. You should expect around a few hundred queries per second. This is very little compared to, for example, the hundreds of thousands of open HTTP connections, easily achievable with nonblocking APIs.
Knowing that throughput of the database is severely limited by hardware, having fully and entirely reactive drivers does not make that much sense after all. Technically, you can implement a wire protocol on top of Netty or RxNetty and never block the client thread. But knowing that JVM can handle hundreds to thousands of threads without much hassle (see “Thread per Connection”), there does not seem to be much benefit derived from rewriting the well-established JDBC API from the ground up. Even Slick from commonly used Lightbend reactive stack powered by Akka toolkit uses JDBC underneath. There are also community-led projects bridging between RxJava and JDBC, such as rxjava-jdbc.
The advice for interacting with relational databases is to actually have a dedicated, well-tuned thread pool and isolate the blocking code there. The rest of your application can be highly reactive and operate on just a handful of threads, but from a pragmatic point of view, just deal with JDBC because trying to replace it with something more reactive could bring a lot of pain for no apparent gain.
PostgreSQL has a peculiar built-in messaging mechanism available through the
NOTIFY extended SQL statements. Every PostgreSQL client can send a notification to virtual channel via a SQL statement, as shown here:
In this example, we send an empty notification followed by an arbitrary string (it can be JSON, XML, or data in any other encoding) to channel named
my_channel. A channel is basically a queue managed inside the PostgreSQL database engine. Interestingly, sending a notification is part of a transaction, so delivery happens after the commit, and in the case of rollback, the message is discarded.
To consume notifications from a particular channel, we first must
LISTEN on that channel. When we begin listening on a given connection, the only way to obtain notifications is by periodic polling by using the
getNotifications() method. This introduces random latency and unnecessary CPU load and context switches; unfortunately, that is how the API was designed. The complete blocking example follows:
Not only do we block the client thread, we are also forced to keep one JDBC connection open because listening is tied to a particular connection. At least we can listen on many channels at the same time. The preceding code is quite verbose but straightforward. After calling
LISTEN, we enter an endless loop asking for new notifications. Calling
getNotifications() is destructive, meaning that it discards returned notifications, so calling it twice will not return the same events.
getName() is the channel name (for example, my_channel), whereas
getParameter() returns optional event contents such as a JSON payload.
The API is horribly old-fashioned, using
null to signal no pending notifications, and arrays rather than collections. Let’s make it more Rx-friendly. In the absence of any push-based mechanism for notifications, we are forced to reimplement polling using the nonblocking
interval() operator. There are many tiny details that allow our custom
Observable to behave properly, which we will discuss further after the example (which is not yet complete):
First, we postpone opening a connection to a database until someone actually subscribes. Also, to avoid connection leaks (serious problem in any application that deals with JDBC directly) we ensure that the connection is closed when the
Subscriber unsubscribes. Moreover, when an error occurs in the stream, the unsubscription and therefore closing the connection happens.
Now we are ready to call
listenOn() and begin receiving notifications over an open connection. If an exception is thrown when this statement is executed, it will be caught and handled by calling
subscriber.onError(e). Not only does it seamlessly propagate the error to the subscriber but it also forces the closing of the connection. But if the
LISTEN request succeeds, the next invocation of
getNotifications() will return all events sent afterward.
We do not want to block any thread so instead, we create an inner Observable with
pollForNotifications(). We subscribe to that
Observable with the same
Subscriber but wrapped with
Subscribers.wrap() so that
onStart() is not executed twice on that
Periodically, we examine the contents of
getNotifications() by first wrapping it in an awkward
Observable<PGNotification>. Because the returned array
PGNotification can be null, we then
filter() out nulls and via
flatMapIterable() unwrap the array, first converting it to a
Arrays::asList. The only reason for including
tryGetNotification() is for handling checked
One last tiny bit of implementation is
refCount(), close to the end of the first method. These two methods make it possible to share a single JDBC connection among multiple subscribers. Without them, every new subscriber would open a new connection and listen on it, which is quite wasteful. Additionally
refCount() keeps track of the number of subscribers and when the last one unsubscribes it physically closes the database connection.
Remembering that a single connection can listen on multiple channels, as an exercise try to implement
observe() so that it reuses the same connection among all subscribers and all channels in which they are interested. The current implementation shares a connection if you call
observe() once and subscribe multiple times, whereas it could easily reuse the same connection all the way down, even for subscribers interested in different channels.
There is no practical reason for exploring
NOTIFY in PostgreSQL; there are faster, more robust and reliable message queues on the market. But this case study was showing how to exploit JDBC in more reactive scenario, even when it still requires a little bit of blocking or polling.
Java 8, apart from lambda expressions, new
java.time API, and multiple smaller additions also brought us
CompletableFuture class. This utility significantly improves the
Future interface known since Java 5. Pure
Future represents asynchronous operation running in the background, typically obtained from
Future’s API is overly simplistic, forcing developers to block on
Future.get() invocation pretty much all the time.
It is not possible to efficiently implement waiting for the first of the
Futures to complete without busy waiting. Other options to compose
Futures are nonexistent. The following section will briefly describe how
CompletableFuture works. Later, we will implement thin interoperability layer between
CompletableFuture successfully bridges that gap by providing dozens of useful methods, almost all of which are nonblocking and composable. We got used to that
map() asynchronously transforms input events on the fly. Moreover,
Observable.flatMap() allows us to replace single event with another
Observable, chaining asynchronous tasks. A similar operation is possible with
Imagine a service that needs two unrelated pieces of information:
GeoLocation. Knowing both of these, we ask several independent travel agencies to find Flight and we book the
Ticket in whichever provider was first to return — promoting the fastest and most reactive one. This last requirement is especially difficult to implement, and prior to Java 8 required
ExecutorCompletionService to effectively find the fastest task:
ExecutorCompletionService was not particularly popular among Java developers, and with
CompletableFuture it is no longer needed. But first notice how we wrap
ExecutorCompletionService so that we can later poll for completed tasks as they arrive. With vanilla
ExecutorService we would get a bunch of
Future objects having no idea which one will complete first, so
ExecutorCompletionService was useful. Yet, we still have to sacrifice one extra thread to block waiting for
TravelAgencies response. Also, we do not take advantage of concurrency where it is possible (loading
GeoLocation at the same time).
Our refactoring will turn all methods into their asynchronous counterparts and later combine
CompletableFutures appropriately. This way our code is fully nonblocking (main thread completes almost immediately) and we parallelize as much as possible:
We simply wrapped blocking methods with an asynchronous CompletableFuture. The
supplyAsync() method takes an optional
Executor as an argument. If not specified, it uses the globally defined one in
ForkJoinPool.commonPool(). It is advised to always use custom
Executor, but for the purpose of this sample, we take advantage of the default one. Just keep in mind that the default is shared among all
CompletableFutures, parallel streams, and a few other less obvious places.
First, we asynchronously begin fetching
GeoLocation. These two operations are independent and can run concurrently. However, we need the results of both in order to proceed, of course without blocking and wasting the client thread. This is what
thenCombine() is doing — it takes two
CompletableFutures (user and location) and invokes a callback when both are completed, asynchronously. Interestingly, the callback can return a value, which will become the new content of resulting
CompletableFuture, as demonstrated here:
CompletableFuture shares a lot of similarities with Observable. The thenApply() performs on-the-fly transformation of whatever the
Future brings, just like
Observable.map(). In our example, we transform
CompletableFuture<Instant> by supplying a function from
Instant::ofEpochMilli). Later, we take two
zoneFuture) and run a transformation on their future values, namely
ZoneId, using the
thenCombine() method. This transformation returns
ZoneDateTime, but because most of the
CompletableFuture operators are nonblocking, we get
CompletableFuture<ZonedDateTime> in return — again, very similar to
Going back to the previous example with booking tickets, the following snippet of code is probably quite obscure:
We need to start asynchronous operation on each
TravelAgency by calling
searchAsync(). We immediately get back a
List<CompletableFuture<Flight>>; this is a very inconvenient data structure if all we need is the first
Future to complete. There are methods like
CompletableFuture.anyOf(). The latter is exactly what we need from a semantic point of view — it takes a group of
CompletableFutures and returns a
CompletableFuture that completes when the very first underlying
CompletableFuture completes, discarding all the others. This is very similar to
Unfortunately the syntax of
anyOf() is very awkward. First, it accepts an array (varargs) and it always returns
CompletableFuture<Object>, not of whatever the type that the underlying
Futures was such as
Flight. We can use it, but it becomes quite messy:
The trick with
Stream.reduce() is as follows. There exists a
CompletableFuture.applyToEither() operator that accepts two
CompletableFutures and applies a given transformation on the first one to complete. The
applyToEither() transformation is extremely useful when you have two homogeneous tasks and you only care about the first one to complete. In the following example, we query
User on two different servers: primary and secondary. Whichever finishes first, we apply a simple transformation that extracts date of the user’s birth. The second
CompletableFuture is not interrupted, but its result is discarded. Obviously, we end up with
applyToEither() can only work on two
CompletableFutures, whereas the quirky
anyOf() can take an arbitrary number. Fortunately, we can call
applyToEither() on the first two
Futures and then take the result (fastest out of the first two) and apply it with the third upstream
Future (fastest out of the first three). By iteratively calling
applyToEither(), we get the
CompletableFuture representing the fastest overall.
This handy trick can be efficiently implemented using the
reduce() operator. One last caveat is the
identity() method from
Function. This is a requirement of
applyToEither(); we must provide a transformation that deals with the first result to come. If the result is supposed to be left as-is, we can use an identity function, which can also be written as
f -> f or
(Flight f) -> f.
Finally, we implemented
CompletableFuture<Flight> that completes when the fastest
TravelAgency responds, asynchronously. There is still a tiny issue with the result of
thenCombine(). Whatever the transformation passed to
thenCombine() returns is then wrapped back into
CompletableFuture. In our case, we return
CompletableFuture<Flight>, so the type of the
thenCombine() result is:
CompletableFuture<CompletableFuture<Flight>>. Double wrapping is a common issue with
Observable as well, and we can use the same trick to fix it in both cases:
flatMap()! But remember that just like
map() is called
flatMap() is called
Normally, we use
thenCompose() to chain an asynchronous computation, but here we simply unwrap the incorrect type. Keep in mind that
thenCompose() expects the return type of the supplied transformation to be
CompletableFuture. But because the internal type is already a
Future, using an
identity() function, or simply
x -> x, fixes the type by unwrapping the internal
Finally, when we have
CompletableFuture<Flight> (abbreviated to
flightFuture), we can call
bookAsync(), which takes a
Flight as an argument:
thenCompose() was used more naturally when calling
bookAsync(). That method returns
CompletableFuture<Ticket>, so to avoid double wrapping, we choose
thenCompose() instead of
The factory method
Observable.from(Future<T>) that returns
Observable<T> already exists. However, because of the limitations of the old
Future<T> API, it has several shortcomings, the biggest one being blocking on
Future.get() internally. Classic
Future<T> implementations have no way of registering callbacks and processing them asynchronously, therefore they are quite useless in reactive applications.
CompletableFuture, in contrast, is a totally different story. Semantically, you can treat
CompletableFuture like an
Observable that has the following characteristics:
- It is hot The computation behind
CompletableFuturestarts eagerly, regardless of whether someone registered any callbacks like
- It is cached The background computation behind
CompletableFutureis triggered once eagerly and the result is forwarded to all registered callbacks. Moreover, if a callback is registered after completion, it is immediately invoked with completed value (or exception)
- It emits exactly one element or exception In principle,
Future<T>completes exactly once (or never) with a value of type
Tor an exception. This matches the contract of
First, we would like to write a utility function that takes a
CompletableFuture<T> and returns a properly behaving
To be notified about both successful and failed completion, we use the
CompletableFuture.whenComplete() method. It receives two parameters excluding each other. If
exception is not
null, it means that the underlying
Future failed. Otherwise, we take the successful
value. In both cases, we notify the incoming
Notice that if the subscription appears after the
CompletableFuture completed (one way or the other), the callbacks are executed immediately.
CompletableFuture caches the result as soon as it completes so that callbacks registered afterward are invoked immediately within the calling thread.
It is tempting to register an unsubscription handler that tries to cancel
CompletableFuture in case of unsubscription:
This is a bad idea. We can create many
Observables based on one
CompletableFuture, and every
Observable can have multiple
Subscribers. If just one
Subscriber decides to unsubscribe prior to
Future’s completion, cancellation will affect all other
CompletableFuture is hot and cached using Rx terminology. It begins computation immediately, whereas
Observable will not start computation until someone actually subscribes. Having that in mind, with the following tiny utility we can further improve our API:
Obviously, if the API you are consuming supports
Observable from the beginning, you do not need all these extra layers of adapters. However, if all you have at your disposal are
CompletableFutures, converting them to
Observables is efficient and safe. The advantage of RxJava is much more concise implementation of our initial problem:
The client code using RxJava API seems less noisy and easier to read. Rx naturally supports “futures with multiple values” in the form of streams. If you still find identity transformation
x -> x inside
flatMap() little bit intimidating, we can always split
zipWith() using a
Pair helper container:
At this point, you should understand why extra
x -> x is no longer needed.
zipWith() takes two independent
Observables and asynchronously waits for both of them. Java has no built-in pairs and tuples, thus we must provide a transformation that will take the events from both streams and combine them into a single
Observable<Pair<User, Location>> object. That object will be the input for the downstream Observable. Later, we use
flatMap() to search every travel agency concurrently for a given User and Location.
flatMap() does the unwrapping for us (from a syntactic perspective), so the resulting stream is a simple
Observable<Flight>. Naturally, in both cases we do
first() to process only the first
Flight occurring upstream (fastest
In some cases, the API you are using might support
CompletableFuture but not RxJava. Such a situation can be quite common, especially taking into account that the former is part of JDK, whereas the latter is a library. Under these circumstances it would be nice to convert
CompletableFuture. There are two ways to implement this transformation:
Use this when you expect just a single item emitted from stream — for example, when Rx wraps a method invocation or request/response pattern. The
CompletableFuture<T> completes successfully when stream completes with exactly one emitted value. Obviously, future completes with an exception when stream completed in such a way or when it did not complete with exactly one item emitted.
In this scenario, the
CompletableFuture completes when all events from upstream
Observable are emitted and the stream completes. This is just a special case of the first transformation, as you will see later.
You can implement the first scenario easily using the following utility:
Before diving into the implementation, keep in mind that this transformation has an important side effect: it subscribes to
Observable, thus forcing evaluation and computation of cold
Observables. Moreover, each invocation of this transformer will subscribe again; it is just a design choice that you must be aware of.
Apart from that, the implementation is quite interesting. First, we force
Observable to emit precisely one element using
single(), throwing an exception otherwise. If this single event is emitted and a stream completes, we invoke
CompletableFuture.complete(). It turns out one can create
CompletableFuture from scratch without any backing thread pool and asynchronous task. It is still a
CompletableFuture, but the only way to complete it and signal all registered callbacks is by calling
complete() explicitly. This is an efficient way of exchanging data asynchronously, at least when RxJava is not available.
In case of failure, we can trigger an error in all registered callbacks by calling
CompletableFuture.completeExceptionally(). As surprising as it is, this is the entire implementation. Future returned from toFuture behaves as if it had some task attached in the background, whereas in reality we explicitly complete it.
The transformation from
CompletableFuture<List<T>> is embarrassingly simple:
Observable is quite useful. The former is properly designed but lacks the expressiveness and richness of the latter. Therefore, if you are forced to deal with
CompletableFuture in an otherwise RxJava-based application, apply these simple transformations as soon as possible to provide a consistent and predictable API. Make sure you understand the difference between eager (hot)
Observable lazy by default.
Observable is a stream, potentially infinite, and all operators are described in terms of streams. But similar to a
List<T>, which can have one element, certain
Observable<T> can by definition always emit one event. It is quite confusing to have a
List<T> to hold exactly one element all the time; therefore, we simply use
Optional<T> for that. In the land of RxJava there is a special abstraction for
Observables emitting exactly one element, and it is called
Single<T> is basically a container for a future value of type
Exception. In that regard
CompletableFuture from Java 8 is the closest cousin of
Single. But unlike
Single is lazy and does not begin producing its value until someone actually subscribes.
Single is typically used for APIs known to return a single value (duh!) asynchronously and with high probability of failure.
Single is a great candidate for request–response types of communication involving I/O, like a network call. The latency is typically high compared to normal method invocation and failures are inevitable. Moreover, because
Single is lazy and asynchronous, we can apply all sorts of tricks to improve latency and resilience, such as invoking independent actions concurrently and combining responses together.
Single reduces the confusion of APIs returning
Observable by providing a type-level guidance:
It’s difficult to predict the contract of the preceding method. Does it return just one temperature measurement and complete? Or maybe it streams temperatures infinitely? Even worse, it might complete without any events under some circumstances. If
Single<Float>, we would have known immediately what output to expect.
Single is fairly similar to
Observable in terms of the operators it supports, so we will not spend much time on them here. Instead, we will briefly compare them to
Observable’s counterparts and focus on use cases for
Single. There are few ways to create a
Single, beginning with the constant
There are no overloaded versions of
just() that take multiple values—after all,
Single can hold only one value by definition. Also the
subscribe() method takes two arguments rather than three. There is simply no point in having an
Single completes with either a value (first callback) or exception (second callback).
Listening on completion alone is equivalent to subscribing for a single value. Additionally, we included the
observeOn() operator, which works exactly the same as its
Observable peer. The same applies to
subscribeOn(). Finally, you can use the
error() operator to create a
Single that always completes with a given
Let’s implement a more real-life scenario of making an HTTP request. After making an HTTP request we can provide a callback implementation that will be invoked asynchronously whenever a response or error comes back. This fits very nicely into how
Single is created:
Single.create() looks similar to
Observable.create() but it has some important constraints; you must call either
onSuccess() once or
onError() once. Technically, it is also possible to have a
Single that never completes, but multiple
onSuccess() invocations are not allowed. Speaking of
Single.create() you can also try
Single.fromCallable() that accepts
Callable<T> and returns
Single<T>. As simple as that.
Going back to our HTTP client example, when a response is back, we let subscribers know by calling
onSuccess() or propagate the exception with
onError() in case of asynchronous failure callback. You can use
Single in similar fashion to
Response.getResponseBody() throws an IOException, so we cannot simply say:
map(Response::getResponseBody). But at least we see how
Single.flatMap() works. By wrapping the potentially dangerous
getResponseBody() method with
Single<String>, we make sure potential failure is encapsulated and clearly expressed in type system.
Single.flatMap() works as you might expect, knowing
Observable.flatMap(): if the second stage of computation (
this::body in our case) fails, the entire Single fails, as well.
flatMap() but no
filter(). Can you guess why?
filter() could potentially filter out content of
Single<T> if it did not meet certain
Single<T> must have exactly one item inside whereas
filter() could result with
Single having none.
Single has its very own
BlockingSingle created with
Single.toBlocking(). Analogously, creating
BlockingSingle<T> does not yet block. However, calling
value() on it blocks until value of type
String containing the response body in our example) is available. In case of exception, it will be rethrown from
rx.Single would be useless if it did not provide composable operators. The most important operator you will come across is
Single.zip(), which works the same way as
Observable.zip() but has simpler semantics.
Single always emits precisely one value (or exception) so the result of
Single.zip() (or a
Single.zipWith() instance version) is always exactly one pair/tuple.
zip() is basically a way of creating a third
Single when two underlying
Suppose that you are rendering an article to be displayed on your website. Three independent operations need to be made to fulfill the request: reading the article content from the database, asking a social media website for a number of likes collected so far, and updating the read count metric. Naive implementation not only performs these three actions sequentially, but also risks unacceptable latency if any of the steps are slow. With
Single every step is modeled separately:
As an example, we show how
Single can be created using
fromCallable by passing a lambda expression. This utility is quite useful because it manages error handling for us. The
content() method uses a handy
JdbcTemplate from Spring framework to unobtrusively load the article content from the database. JDBC is inherently blocking the API, so we explicitly call
subscribeOn() to make
The implementation of
updateReadCount() is omitted. You can imagine
likes() making an asynchonous HTTP request to some API using RxNetty.
updateReadCount() is interesting because it has a
Single<Void> type. This suggest it performs some side effect that has no return value but significant latency.
Yet, we still might want to be notified about possible failures that happened asynchronously. RxJava has a special type for such cases, as well:
Completable. This specifies that either complete without result or yield exception asynchronously.
Combining these three operations with
zip is quite straightforward:
Single.zip() takes three
Singles (it has overloaded versions for anything between two and nine instances) and invokes our custom function when all three are completed. The outcome of this custom function is then placed back in a
Single<Document> instance that we can further transform. You should be aware that
Void result is never used by the transformation. This means that we wait until
updateReadCount() completes, yet we do not need its (empty) result. This might be a requirement or can suggest possible optimization: building an HTML document might work just as well if
updateReadCount() is executed asynchronously without waiting for its completion or failure.
Now imagine what happens if invoking
likes() fails or takes an unacceptably long time to complete (which is actually much worse). Without reactive extensions rendering, HTML fails entirely or takes a considerable amount of time. However, our implementation is not much better in that regard. Single supports multiple operators like
onErrorResumeNext() that enhance resiliency and error handling. All of these operators behave the same way as their Observable counterparts.
From the perspective of the type system,
Single are unrelated. This basically means when
Observable is required, you cannot use
Single and vice versa. However there are two situations for which conversion between the two makes sense:
- When we use
Observablethat emits one value and completion notification (or error notification)
Singleis missing certain operators available in
cache()is one such example
We use the
cache() operator here so that
Single generates “42” only once for the first subscriber.
Single.toObservable() is a safe and easy to understand operator. It takes a
Single<T> instance and converts it to an
Observable<T>, emitting one element immediately followed by a completion notification (or error notification if that is how
Observable.toSingle() requires more attention. Just like
toSingle() will throw an exception saying that “
Observable emitted too many elements” if the underlying
Observable emits more than one element. Similarly, expect “
Observable emitted no items” if
Observable is empty:
Now you might think that when the
toSingle() operators are used close to each other, the latter is safe, but it does not need to be the case. For example, intermediate
Observable might duplicate or discard an event emitted by
In the preceding code,
Observable simply discards a single value emitted from Single. Therefore, when the
toSingle() operator is used, all it can see is an
Observable is that the completes without any elements. One thing to keep in mind that
toSingle() operator, just like all of the other operators we discovered so far, is lazy. The exception about
Single not emitting precisely one event will appear only when someone actually subscribes.
Having two abstractions,
Single, it is important to distinguish between them and understand when to use which one. Just like with data structures, one size does not fit all. You should use
Single in the following scenarios:
- An operation must complete with some particular value or an exception. For example, calling a web service always results with either a response from an external server or some sort of exception
- There is no such thing as a stream in your problem domain; using
Observablewould be misleading and an overkill
Observableis too heavyweight and you measured that
Singleis faster in your particular problem
On the other hand, you should prefer
Observable for these circumstances:
- You model some sort of events (messages, GUI events) which are by definition occurring several times, possibly infinite
- Or entirely the opposite, you expect the value to occur or not before completion
The latter case is quite interesting. Do you think it makes sense for
findById(int) method on some repository to return
Single<Record> rather than
Observable<Record>? Well, it sounds reasonable: we look up an item by ID (which suggests there is just one such
Record). However, there is no guarantee that a
Record exists for every ID we supply. Therefore, this method can technically return nothing, modeled as
Observable<Record>, which are perfectly capable of handling empty streams followed by a completion notification.
Single? It must either complete with a single value (
Record) or with an exception. It is your design choice if you want to model a nonexisting record with an exception, but this often is considered a bad practice. Deciding whether a missing record for a given ID is a truly exceptional situation is not a responsibility of repository layer.
In this chapter, we covered quite advanced topics related to designing entirely reactive applications. This part was much more advanced, showing real-life techniques for implementing event-driven systems without introducing accidental complexity. We have shown several benchmarks proving that RxJava together with a nonblocking networking stack like Netty. You are not forced to use such advanced libraries but it certainly pays off when you strive for maximum throughput on a commodity servers.