Introducing a new library, technology, or paradigm to an application, be it greenfield or legacy codebase, must be a careful decision. RxJava is not an exception. This process is not straightforward and requires a significant mindset shift, therefore we will carefully transform from imperative to functional and reactive style. Many libraries in Java projects these days simply add bloat without giving anything in return. However, you will see how RxJava not only simplifies traditional projects, but what kinds of benefits it brings to legacy platforms.
RxJava can be a significant step forward in terms of architectural consistency and robustness. You do not need to commit to reactive style top-to-bottom—this is too risky and requires too much work in the beginning. But Rx can be introduced at any layer, without breaking an application as a whole.
Unless your platform was built recently in JVM frameworks like Play, Akka actors, or maybe Vert.x, you are probably on a stack with a servlet container on one hand, and JDBC or web services on the other. Between them, there is a varying number of layers implementing business logic, which we will not refactor all at once; instead, let’s begin with a simple example. The following class represents a trivial repository abstracting us from a database:
How is this mundane Dao relevant here? Observable is not only a pipe pushing events downstream. You can treat
Observable<T> as a data structure, dual to
Iterable<T>. They both hold items of type
T, but providing a radically different interface. So, it shouldn’t come as a surprise that you can simply replace one with the other:
At this point, we made a breaking change to the existing API. Depending on how big your system is, such incompatibility might be a major concern. Thus, it is important to bring RxJava into your API as soon as possible. Obviously, we are working with an existing application so that can’t be the case.
If you are combining RxJava with existing, blocking and imperative code, might need have to translate
Observable to a plain collection. This transformation is rather unpleasant, it requires blocking on an
Observable waiting for its completion. Until
Observable completes, we are not capable of creating a collection.
BlockingObservable is a special type that makes it easier to work with
Observable in nonreactive environment.
BlockingObservable should be your last choice when working with RxJava, but it is inevitable when combining blocking and nonblocking code.
We want to take baby steps rather than massive refactoring, so let’s keep the scope of changes as minimal as possible. The client code could look like this:
We can imagine the
marshal() method pulling data from the people collection and serializing them to JSON. That’s no longer the case, we can’t simply pull items from
Observable when we want.
Observable is in charge of producing (pushing) items and notifying subscribers if any. This radical change can be easily circumvented with
BlockingObservable. This handy class is entirely independent from
Observable and can be obtained via the
The blocking variant of
Observable has superficially similar methods like
BlockingObservable is much more convenient in blocking environments that are inherently unprepared for the asynchronous nature of
Observable. Operators on
BlockingObservable typically block (wait) until the underlying
Observable is completed. This strongly contradicts the main concept of
Observables that everything is likely asynchronous, lazy, and processed on the fly. For example,
Observable.forEach() will asynchronously receive events from
Observable as they come in, whereas
BlockingObservable.forEach() will block until all events are processed and stream is completed. Also exceptions are no longer propagated as values (events) but instead are rethrown in the calling thread.
In our case, we want to transform
Observable<Person> back into
List<Person> to limit the scope of refactoring:
I intentionally left all intermediate types explicit in order to explain what happens. After refactoring to Rx, our API returns
Observable<Person> peopleStream. This stream can potentially be fully reactive, asynchronous, and event driven, which doesn’t match at all what we need: a static List. As the first step, we turn
Observable<List<Person>>. This lazy operator will buffer all
Person events and keep them in memory until the
onCompleted() event is received.
The resulting stream completes immediately after emitting a single List item. Again, this operator is asynchronous; it doesn’t wait for all events to arrive but instead lazily buffers all values. The awkward looking
Observable<List<Person>> peopleList is then converted to
BlockingObservable is a good idea only when you must provide a blocking, static view of your otherwise asynchronous
Observable.from(List<T>) converts normal pull-based collection into
toBlocking() does something quite the opposite. You might ask yourself why we need two abstractions for blocking and nonblocking operators. The authors of RxJava figured out that being explicit about synchronous versus asynchronous nature of underlying operator is too crucial to be left for JavaDoc. Having two unrelated types ensures that you always work with the appropriate data structure.
BlockingObservable is your weapon of last resort; normally, you should compose and chain plain
Observables as long as possible. However, for the purpose of this exercise, let’s escape from
Observable right away. The last operator
single() drops observables altogether and extracts one, and only one, item we expect to receive from
BlockingObservable<T>. A similar operator,
first(), will return a value of
T and discard whatever it has left.
single(), on the other hand, makes sure there are no more pending events in underlying
Observable before terminating. This means
single() will block waiting for
You might think that we went through all this hassle of wrapping and unwrapping
Observable for no apparent reason. Remember, this was just the first step. The next transformation will introduce some laziness. Our code as it stands right now always executes
query("...") and wraps it with
Observable. As you know by now,
Observables (especially cold ones) are lazy by definition. As long as no one subscribes, they just represent a stream that never had a chance to begin emitting values. Most of the time you can call methods returning
Observable and as long as you don’t subscribe, no work will be done.
Observable is like a
Future because it promises a value to appear in the future. But as long as you don’t request it, a cold
Observable will not even begin emitting. From that perspective,
Observable is more similar to
java.util.function.Supplier<T>, generating values of type
T on demand. Hot
Observables are different because they emit values whether you are listening or not, but we are not considering them right now. The mere existence of
Observable does not indicate a background job or any side effect, unlike Future, which almost always suggests some operation running concurrently.
So how do we make our
Observable lazy? The simples technique is to wrap an eager
Observable.defer() takes a lambda expression (a factory) that can produce
Observable. The underlying
Observable is eager, so we want to postpone its creation.
defer() will wait until the last possible moment to actually create
Observable; that is, until someone actually subscribes to it. This has some interesting implications.
Observable is lazy, calling
listPeople() has no side effects and almost no performance footprint. No database is queried yet. You can treat
Observable<Person> as a promise but without any background processing happening yet. Notice that there is no asynchronous behavior at the moment, just lazy evaluation.
If you never programmed in functional languages, you might be quite confused why laziness is so important and groundbreaking. It turns out that such behavior is quite useful and can improve the quality and freedom of your implementation quite a bit. For example, you no longer have to pay attention to which resources are fetched, when, and in what order. RxJava will load them only when they are absolutely needed.
As an example take this trivial fallback mechanism that we have all seen so many times:
You probably think there is nothing wrong with such a construct. In this example, we try to recommend the best book for a given person, but in case of failures, we degrade gracefully and display the best seller. The assumption is that fetching a bestseller is faster and can be cached. But what if you could add error handling declaratively so that
try-catch blocks aren’t obscuring real logic?
We are only exploring RxJava so far, thus I left all these intermediate values and types. In real life,
bestBookFor() would look more like this:
This code is beautifully concise and readable. First find a recommendation for person. In case of
onErrorResumeNext), proceed with a bestseller. No matter which one succeeded, map returns a value by extracting the title and then displays it.
onErrorResumeNext() is a powerful operator that intercepts exceptions happening upstream, swallows them, and subscribes to provided backup Observable. This is how Rx implements a
SELECT * FROM PEOPLE is not really a state-of-the-art SQL query. First, you should not fetch all columns blindly, but fetching all rows is even more damaging. Our old API is not capable of paging results, viewing just a subset of a table. It might look like this, again in traditional enterprise application:
The author of this API was merciless: we don’t have the freedom to choose any range of records, we can only operate on 0-based page numbers. However in RxJava, due to laziness we can actually simulate reading an entire database starting from given page:
This code snippet lazily loads the initial page of database records, for example 10 items. If no one subscribes, even this first query is not invoked. If there is a subscriber that only consumes a few initial elements (e.g.,
allPeople(0).take(3)), RxJava will unsubscribe automatically from our stream and no more queries are executed.
So what happens when we request, say, 11 items but the first
listPeople() call returned only 10? Well, RxJava figures out that the initial
Observable is exhausted but the consumer is still hungry. Luckily, it sees
concatWith() operator, that basically says: when the
Observable on the left is completed, rather than propagating completion notification to subscribers, subscribe to
Observable on the right and continue as if nothing happened, as depicted in the following marble diagram:
In other words,
concatWith() can join together two Observables so that when the first one completes, the second one takes over. In
a.concatWith(b).subscribe(...), subscriber will first receive all events from a, followed by all events from b. In this case, the subscriber first receives an initial 10 items followed by a subsequent 10.
However, look carefully, there is an alleged infinite recursion in our code!
allPeople(initialPage + 1) without any stop condition. This is a recipe for
StackOverflowError in most languages, but not here. Again, calling
allPeople() is always lazy, therefore the moment you stop listening (unsubscribe), this recursion is over. Technically
concatWith() can still produce
The technique of lazily loading data chunk by chunk is quite useful because it allows you to concentrate on business logic, not on low-level plumbing. We already see some benefits of applying RxJava even on a small scale. Designing an API with Rx in mind doesn’t influence the entire architecture, because we can always fall back to
BlockingObservable and Java collections. But it’s better to have wide range of possibilities that we can further trim down if necessary.
There are more ways to implement lazy paging with RxJava. If you think about it, the simplest way of loading paged data is to load everything and then take whatever we need. It sounds silly, but thanks to laziness it is feasible. First we generate all possible page numbers and then we request loading each and every page individually:
If this were not RxJava, the preceding code would take an enormous amount of time and memory, basically loading the entire database to memory. But because
Observable is lazy, no query to the database appeared yet. Moreover, if we find an empty page it means all further pages are empty, as well (we reached the end of the table). Therefore, we use
takeWhile() rather than
filter(). To flatten
Observable<Person> we can use
concatMap() requires a transformation from
Observable<Person>, executed for each page. Alternatively we can try
concatMapIterable(), which does the same thing, but the transformation should return an
Iterable<Person> for each upstream value (happening to be
No matter which approach you choose, all transformations on
Person object are lazy. As long as you limit the number of records you want to process (for example with
Observable<Person> will invoke
listPeople() as late as possible.
I don’t often see explicit concurrency in enterprise applications. Most of the time a single request is handled by a single thread. The same thread does the following:
- Accepts TCP/IP connection
- Parses HTTP request
- Calls a controller or servlet
- Blocks on database call
- Processes results
- Encodes response (e.g., in JSON)
- Pushes raw bytes back to the client
This layered model affects user latency when the backend makes several independent requests for instance to database. They are performed sequentially, whereas one could easily parallelize them. Moreover scalability is affected.
For example in Tomcat there are 200 threads by default in the executors that are responsible for handling requests. This means that we can’t handle more than 200 concurrent connections. In case of a sudden but short burst of traffic, incoming connections are queued and the server responds with higher latency. However, this situation can’t last forever, and Tomcat will eventually begin rejecting incoming traffic.
For the time being, let’s stay with traditional architecture. Executing every step of request handling within a single thread has some benefits, for example improved cache locality and minimal synchronization overhead. Unfortunately, in classic applications, because overall latency is the sum of each layer’s latencies, one malfunctioning component can have a negative impact on total latency. Moreover, sometimes there are many steps that are independent from one another and can be executed concurrently. For example, we invoke multiple external APIs or execute several independent SQL queries.
JDK has quite good support for concurrency, especially since Java 5 with
ExecutorService and Java 8 with
CompletableFuture. Nonetheless, it is not as widely used as it could be. For example, let’s look at the following program with no concurrency whatsoever:
And on the client side:
Again, quite typical, classic blocking code, similar to what you can find in many applications. But if you look carefully from a latency perspective, the preceding code snippet has four steps; however, the first two are independent from each other. Only the third step (
bookTicket()) needs results from
findPassenger(). There exists an obvious opportunity to take advantage of concurrency. Yet, very few developers will actually go down this path because it requires awkward thread pools,
Futures, and callbacks.
rx- methods do exactly the same thing and in the same way; that is, they are blocking by default. We didn’t gain anything yet, apart from a more verbose API from the client perspective:
To proceed with
bookTicket(), we need concrete instances of both Flight and Passenger. It is tempting to just block on these two
Observables by using the
toBlocking() operator. However, we would like to avoid blocking as much as possible to reduce resource consumption (especially memory) and allow greater concurrency.
Another poor solution is to
.subscribe() on the flight and passenger
Observables and somehow wait for both callbacks to finish. It’s fairly straightforward when
Observable is blocking, but if callbacks appear asynchronously and you need to synchronize some global state waiting for both of them, this quickly becomes a nightmare.
Also a nested
Observables at the same time is
You might perceive
zip as a way to join two independent streams of data pair-wise. But far more often,
zip is simply used to join together two single-item
ob1.zip(ob2).subscribe(...) essentially means that receiving an event when both
ob2 are done (emit an event on their own). So whenever you see
zip, it’s more likely that someone is simply making a join step on two or more
Observables that had forked paths of execution.
zip is a way to asynchronously wait for two or more values, no matter which one appears last.
So let’s get back to
flight.zipWith(passenger, this::bookTicket) (a shorter syntax using method reference instead of explicit lambda, as in the code sample). The reason I keep all of the type information rather than fluently joining expressions is because I want you to pay attention to return types.
flight.zipWith(passenger, ...) doesn’t simply invoke callback when both
passenger are done; it returns a new
Observable which you should immediately recognize as a lazy placeholder for data. Amazingly, at this point in time no computation was yet started, as well. We simply wrapped few data structures together, but no behavior was triggered. As long as no one subscribes to
Observable<Ticket>, RxJava won’t run any backend code. This is what finally happens in last statement:
ticket.subscribe() explicitly asks for Ticket.
To understand the flow of execution, it’s useful to look bottom up. We subscribed to ticket, thus RxJava must subscribe transparently to both flight and passenger. At this point the real logic happens. Because both Observables are cold and no concurrency is yet involved, the first subscription to flight invokes the
lookupFlight() blocking method right in the calling thread. When
lookupFlight() is done, RxJava can subscribe to passenger. However, it already received a
Flight instance from synchronous flight.
findPassenger() in a blocking fashion and receives a
Passenger instance. At this juncture, data flows back downstream. Instances of
Passenger are combined using the provided lambda (
bookTicket) and passed to
This sounds like a lot of work considering it behaves and works essentially just like our blocking code in the beginning. But now we can declaratively apply concurrency without changing any logic. If our business methods returned
CompletableFuture<Flight>, it doesn’t really matter), two decisions would have been made for us:
- The underlying invocation of
lookupFlight()already began and there is no place for laziness. We don’t block on such method, but work already started
- We have no control over concurrency whatsoever, it is the method implementation that decides whether a
Futuretask is invoked in a thread pool, new thread per request, and so on
RxJava gives users more control. Just because
Observable<Flight> wasn’t implemented with concurrency in mind, this does not mean that we cannot apply it later. Real-world
Observables are typically asynchronous already, but in rare cases you must add concurrency to an existing Observable. The consumers of our API, not the implementors, are free to choose the threading mechanism in case of the synchronous
Observable. All of this is achieved by using the
At any point before subscribing, we can inject
subscribeOn() operator and provide a so-called
Scheduler instance. In this case, I used the
Schedulers.io() factory method, but we can just as well use a custom
ExecutorService and quickly wrap it with
Scheduler. When subscription occurs, the lambda expression passed to
Observable.create() is executed within the supplied
Scheduler rather than the client thread.
Scheduler change the runtime behavior of our program? Remember that the
zip() operator subscribes to two or more
Observables and waits for pairs (or tuples). When subscription occurs asynchronously, all upstream
Observables can call their underlying blocking code concurrently. If you now run your program,
findPassenger() will begin execution immediately and concurrently when
ticket.subscribe() is invoked. Then,
bookTicket() will be applied as soon as the slower of the aforementioned
Observables emits a value.
Talking about slowness, you can declaratively apply a timeout, as well, when a given
Observable does not emit any value in the specified amount of time:
As always, in case of errors, they are propagated downstream rather than thrown arbitrarily. So if the
lookupFlight() method takes more than 100 milliseconds, you will end up with
TimeoutException rather than an emitted value sent downstream to every subscriber.
We ended up with two methods running concurrently without much effort, assuming that your API is already Rx-driven. But we cheated a little bit with
bookTicket() still returning
Ticket, which definitely means it is blocking. Even if booking ticket was extremely fast, it is still worth declaring it as such, just to make the API easier to evolve. The evolution might mean adding concurrency or using in fully nonblocking environments. The opposite is often challenging and requires lots of extra resources. Also, it is very difficult to predict the evolution of methods like
rxBookTicket(), if they ever touch the network or filesystem, not to mention database, it is worth it to wrap them with an
Observable indicating possible latency on the type level:
zipWith() returns an awkward
Observable<Observable<Ticket>> and the code no longer compiles. A good rule of thumb is that whenever you see double-wrapped type (for example
Optional<Optional<...>>) there is a
flatMap() invocation missing somewhere.
zipWith() takes a pair (or more generally a tuple) of events, applies a function taking these events as arguments, and puts the result into the downstream
Observable as-is. This is why we saw
Observable<Ticket> first but now it’s
Observable<Ticket> is the result of our supplied function. There are two ways to overcome this problem. One way is by using an intermediate pair returned from
An alternative to an intermediate pair is applying a
flatMap with an identity function:
obs -> obs lambda expression is seemingly not doing anything, at least if it were a
map() operator. But remember that
flatMap() applies a function to each value inside
Observable, so this function takes
Observable<Ticket> as an argument in our case. Later, the result is not placed directly in the resulting stream, like with
map(). Instead, the return value (of type
Observable<T>) is “flattened,” leading to an
Observable<T> rather than
It is tempting to think that
subscribeOn() is the right tool for concurrency in RxJava. This operator works but you should not see the usage of
subscribeOn() (and yet to be described
observeOn()) often. In real life,
Observables come from asynchronous sources, so custom scheduling is not needed at all.
In our sample application, we must now send a list of
Tickets via e-mail. But we must keep in mind the following:
- The list can be potentially quite long
- Sending an email might take several milliseconds or even seconds
- The application must keep running gracefully in case of failures, but report in the end which tickets failed to be delivered
The last requirement quickly rules out simple
tickets.forEach(this::sendEmail) because it eagerly throws an exception and won’t continue with tickets that were not yet delivered. Exceptions are actually a nasty back door to the type system and just like callbacks are not very friendly when you want to manage them in a more robust way. That is why RxJava models them explicitly as special notifications, but be patient, we will get there. In light of the error-handling requirement, our code looks more-or-less like that:
However, the first two requirements or guidelines aren’t addressed. There is no reason why we keep sending emails from one thread sequentially. Traditionally, we could use an
ExecutorService pool for that by submitting each email as a separate task:
That’s a fair amount of code that all Java programmers should be familiar with. Yet it seems too verbose and accidentally complex. First, we iterate over tickets and submit them to a thread pool. To be precise, we call the
sendEmailAsync() helper method that submits
sendEmail() invocation wrapped in
Callable<SmtpResponse> to a thread pool. Even more precise instances of
Callable are first placed in an unbounded (by default) queue in front of a thread pool. Lack of mechanisms that slow down too rapid submission of tasks if they cannot be processed on time led to reactive streams and backpressure effort.
At this point, the thread pool is already running multiple
sendEmail() invocations concurrently, which is precisely what we were aiming at. The second loop goes through all
Futures and tries to dereference them by blocking (
get()) and awaiting for completion. If
get() returns successfully, we skip such a
Ticket. However, if there is an exception we return
Ticket instance that was associated with this task—we know it failed and we want to report it later.
Stream.flatMap() allows us to return zero or one elements (or actually any number), contrary to
Stream.map(), which always requires one.
You might be wondering why we need two loops instead of just one like this:
This is an interesting bug that is really difficult to find if you don’t understand how
Streams in Java 8 work. Because streams—just like
Observables—are lazy, they evaluate the underlying collection one element at a time and only when terminal operation was requested (e.g.,
collect(toList())). This means that a
map() operation starting background tasks is not executed on all tickets immediately; rather, it’s done one at a time, alternately by using a
flatMap() operation. Furthermore, we really start one
Future, block waiting for it, start a second
Future, block waiting on that, and so on. An intermediate collection is needed to force evaluation, not because of clarity or readability. After all,
List<Pair<Ticket, Future<SmtpResponse>>> type is hardly more readable.
That’s plenty of work and the possibility of mistake is high, so it’s no wonder that developers are reluctant to apply concurrent code on a daily basis. The little-known
ExecutorCompletionService from JDK is sometimes used when there is a pool of asynchronous tasks and we want to process them as they complete. Moreover, Java 8 brings
CompletableFuture that is entirely reactive and nonblocking. But how can RxJava help here? First, assume that an API for sending an email is already retrofitted to use RxJava:
There is no concurrency involved, just wrapping
sendEmail() inside an
Observable. This is a rare
Observable; ordinarily you would use
subscribeOn() in the implementation so that the
Observable is asynchronous by default. At this point, we can iterate over all tickets as before:
It is easy to see that inner
flatMap() in our example ignores response and returns an empty stream. In such cases,
flatMap() is an overkill; the
ignoreElements() operator is far more efficient.
ignoreElements() simply ignores all emitted values and forwards
onError() notifications. Because we are ignoring the actual response and just deal with errors,
ignoreElements() works great here.
All we are interested in lies inside the outer
flatMap(). If it were just
flatMap(this::rxSendEmail), code would work; however, any failure emitted from
rxSendEmail would terminate the entire stream. But we want to “catch” all emitted errors and collect them for later consumption. We use a similar trick to
Stream.flatMap(): if response was successfully emitted, we transform it to an empty
Observable. This basically means that we discard successful tickets. However, in case of failures, we return a ticket that raised an exception. An extra
doOnError() callback allows us to log exception—of course we can just as well add logging to
onErrorReturn() operator, but I found this separation of concerns more functional.
To remain compatible with previous implementations, we transform
toBlocking(), and finally
single()). Interestingly, even
BlockingObservable remains lazy. A
toBlocking() operator on its own doesn’t force evaluation by subscribing to the underlying stream and it doesn’t even block. Subscription and thus iteration and sending emails is postponed until
single() is invoked.
Note that if we replace the outer
concatMap(), we will encounter a similar bug as the mentioned with JDK’s
Stream. As opposed to
merge) that subscribe immediately to all inner streams,
concat) subscribes one inner
Observable after another. And as long as no one subscribed to
Observable, no work even began.
So far, a simple for loop with a
try—catch was replaced with less readable and more complex
Observable. However, to turn our sequential code into multithreaded computation we barely need to add one extra operator:
It is so noninvasive that you might find it hard to spot. One extra
subscribeOn() operator causes each individual
rxSendMail() to be executed on a specified
io(), in this case). This is one of the strengths of RxJava; it is not opinionated about threading, defaulting to synchronous execution but allowing seamless and almost transparent multithreading.
Wrapping up if you are implementing
Observables from scratch, making them asynchronous by default is more idiomatic. That means placing
subscribeOn() directly inside
rxSendEmail() rather than externally. Otherwise, you risk wrapping already asynchronous streams with yet another layer of schedulers.
Of course, if the producer behind
Observable is already asynchronous, it is even better because your stream does not bind to any particular thread. Additionally, you should postpone subscribing to an
Observable as late as possible, typically close to the web framework of our outside world. This changes your mindset significantly. Your entire business logic is lazy until someone actually wants to see the results.
There are numerous examples of such cases and various approaches are taken by API designers. Typically, we need to provide some sort of a callback that the API invokes, often called event listeners. We can replace with relative ease such listeners with a composable
Observable, which is much more robust and versatile. The traditional listener looks similar to this class, here using JMS support in Spring framework, but our solution is technology-agnostic:
When a JMS message is received, the
JmsConsumer class must decide what to do with it. Typically, some business logic is invoked inside a message consumer. When a new component wants to be notified about such messages, it must modify
JmsConsumer appropriately. Coversely, imagine
Observable<Message> that can be subscribed to by anyone.
The easiest way to convert from a push, callback-based API to
Observable is to use
Subjects. Every time a new JMS message is delivered, we push that message to a
PublishSubject that looks like an ordinary hot
Observable from the outside:
Keep in mind that
Observable<Message> is hot; it begins emitting JMS messages as soon as they are consumed. If no one is subscribed at the moment, messages are simply lost.
ReplaySubject is an alternative, but because it caches all events since the application startup, it’s not suitable for long-running processes. Additionally, our message listener has a
concurrency="1" parameter to ensure that
Subject is not invoked from multiple threads. As an alternative, you can use
As a side note,
Subjects are easier to get started but are known to be problematic after a while. In this particular case, we can easily replace Subject with the more idiomatic RxJava
Observable that uses
The JMS API provides two ways of receiving messages from a broker: synchronous via blocking
receive() method, and nonblocking, using
MessageListener. The nonblocking API is beneficial for many reasons; for example, it holds less resources like threads and stack memory. Also it aligns beautifully with the Rx style of programming. Rather than creating a
MessageListener instance and calling our subscriber from within it, we can use this terse syntax with method reference:
Also, we must take care of resource cleanup and proper error handling. This tiny transformation layer allows us to easily consume JMS messages without worrying about API internals. Here an example using the popular ActiveMQ messaging broker running locally:
JMS, just like JDBC, has a reputation of heavily using checked
JMSException, even when calling
getText() on a TextMessage. To properly handle errors (see “Error Handling” for more details) we use
flatMap() and wrap exceptions. From that point, you can treat JMS messages flowing in like any other asynchronous and nonblocking stream. And by the way, we used the
cast() operator that optimistically casts upstream events to a given type, failing with
cast() is basically a specialized
map() operator that behaves like
map(x -> (TextMessage)x).
The worst blocking API that you can work with requires polling for changes. It provides no mechanism to push changes right at you, even with callbacks or by blocking indefinitely. The only mechanism this API gives is asking for the current state, and it is up to you to figure out if it differs from previous state or not. RxJava has few really powerful operators that you can apply to retrofit a given API to Rx style.
The first case I want you to consider is a simple method that delivers a single value that represents state, for example
long getOrderBookLength(). To track changes we must call this method frequently enough and capture differences. You can achieve this in RxJava with a very basic operator composition:
First we produce a synthetic
long value every 10 milliseconds which serves as a basic ticking counter. For each such value (that is every 10 milliseconds), we call
getOrderBookLength(). However, the aforementioned method doesn’t change that often, and we don’t want to flood our subscribers with lots of irrelevant state changes.
Luckily we can simply say
distinctUntilChanged() and RxJava will transparently skip long values returned by
getOrderBookLength() that have not changed since last invocation, as demonstrated in the following marble diagram:
We can apply this pattern even further. Imagine that you are watching for filesystem or database table changes. The only mechanism at your disposal is taking a current snapshot of files or database records. You are building an API that will notify clients about every new item. Obviously, you can use
java.nio.file.WatchService or database triggers, but take this as an educational example.
distinct() operator keeps a record of all items that passed through it. If the same item appears for the second time, it is simply ignored. That is why we can push the same list of Items every second. The first time they are pushed downstream to all subscribers. However, when the exact same list appears one second later, all items were already seen and are therefore discarded. If at some point in time the list returned from
query() contains one extra Item,
distinct() will let it go but discard it the next time. This simple pattern allows us to replace a bunch of
Thread.sleep() invocations and manual caching with periodic polling. It is applicable in many areas, like File Transfer Protocol (FTP) polling, web scraping, and so on.
One of the hallmarks of RxJava is declarative concurrency, as opposed to imperative concurrency. Manually creating and managing threads is a thing of the past, most of us already use managed thread pools (e.g., with
ExecutorService). But RxJava goes one step further:
CompletableFuture can be nonblocking just like
CompletableFuture in Java 8, but unlike the other, it is also lazy. Unless you subscribe, a well-behaving
Observable will not perform any action. But the power of Observable goes even beyond that.
Observable is the one that calls your
Subscribers callback methods (like
onNext()) from a different thread. Recall “Mastering
Observable.create()” in which we explored when
subscribe() is blocking, waiting until all notifications arrive? In real life, most
Observables come from sources that are asynchronous by their nature.
subscribe() method happens very rarely, when a lambda within
Observable.create() is not backed by any asynchronous process or stream. However, by default (with
create()) everything happens in the client thread (the one that subscribed). If you just poke
onNext() directly within your
create() callback, no multithreading and concurrency is involved whatsoever.
Encountering such an unusual
Observable, we can declaratively select the so-called
Scheduler that will be used to emit values. In case of
CompletableFuture, we have no control over underlying threads, the API made the decision and in worst case it is impossible to override it. RxJava rarely makes such decisions alone and chooses safe default: client thread and no multithreading involved. For the purposes of this chapter, we will use a really simple logging “library,” which will print a message along with the current thread and number of milliseconds since the start of the program using
RxJava is concurrency-agnostic, and as a matter of fact it does not introduce concurrency on its own. However, some abstractions to deal with threads are exposed to the end user. Also, certain operators cannot work properly without concurrency; see “Other Uses for Schedulers” for some of them. Luckily, the
Scheduler class, the only one you must pay attention to, is fairly simple. In principle it works similarly to
java.util.concurrent — it executes arbitrary blocks of code, possibly in the future. However, to meet Rx contract, it offers some more fine-grained abstractions, which you can see more of in the advanced section “Scheduler implementation details overview”.
Schedulers are used together with
observeOn() operators as well as when creating certain types of
Observables. A scheduler only creates instances of Workers that are responsible for scheduling and running code. When RxJava needs to schedule some code it first asks
Scheduler to provide a Worker and uses the latter to schedule subsequent tasks. You will find examples of this API later on, but first familiarize yourself with available built-in schedulers:
This scheduler simply starts a new thread every time it is requested via
newThread() is hardly ever a good choice, not only because of the latency involved when starting a thread, but also because this thread is not reused. Stack space must be allocated up front (typically around one megabyte, as controlled by the -Xss parameter of the JVM) and the operating system must start new native thread. When the Worker is done, the thread simply terminates. This scheduler can be useful only when tasks are coarse-grained: it takes a lot of time to complete but there are very few of them, so that threads are unlikely to be reused at all. In practice, following
Schedulers.io() is almost always a better choice.
This scheduler is similar to
newThread(), but already started threads are recycled and can possibly handle future requests. This implementation works similarly to
java.util.concurrent with an unbounded pool of threads. Every time a new
Worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused.
io() is not a coincidence. Consider using this scheduler for I/O bound tasks which require very little CPU resources. However they tend to take quite some time, waiting for network or disk. Thus, it is a good idea to have a relatively big pool of threads. Still, be careful with unbounded resources of any kind—in case of slow or unresponsive external dependencies like web services,
io() scheduler might start an enormous number of threads, leading to your very own application becoming unresponsive, as well. See “Managing Failures with Hystrix” for more details how to tackle this problem.
You should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code (reading from disk, network, sleeping, waiting for lock, etc.) Because each task executed on this scheduler is supposed to fully utilize one CPU core, executing more such tasks in parallel than there are available cores would not bring much value. Therefore,
computation() scheduler by default limits the number of threads running in parallel to the value of
availableProcessors(), as found in the
Runtime.getRuntime() utility class.
If for some reason you need a different number of threads than the default, you can always use the
rx.scheduler.max-computation-threads system property. By taking less threads you ensure that there is always one or more CPU cores idle, and even under heavy load,
computation() thread pool does not saturate your server. It is not possible to have more computation threads than cores.
computation() scheduler uses unbounded queue in front of every thread, so if the task is scheduled but all cores are occupied, they are queued. In case of load peak, this scheduler will keep the number of threads limited. However, the queue just before each thread will keep growing.
Schedulers are internally more complex than
java.util.concurrent, so a separate abstraction was needed. But because they are conceptually quite similar, unsurprisingly there is a wrapper that can turn
Scheduler using the
from() factory method:
I am intentionally using this verbose syntax for creating ExecutorService rather than the more simple version:
Although tempting, the
Executors factory class hardcodes several defaults that are impractical or even dangerous in enterprise applications. For examples, it uses unbounded
LinkedBlockingQueue that can grow infinitely, resulting in
OutOfMemoryError for cases in which there are a of large number of outstanding tasks. Also, the default
ThreadFactory uses meaningless thread names like
pool-5-thread-3. Naming threads properly is an invaluable tool when profiling or analyzing thread dumps. Implementing
ThreadFactory from scratch is a bit cumbersome, so we used
ThreadFactoryBuilder from Guava.
If you are interested in tuning and properly utilizing thread pools even further, see “Thread Pool of Connections” and “Managing Failures with Hystrix”. Creating schedulers from
Executor that we consciously configured is advised for projects dealing with high load. However, because RxJava has no control over independently created threads in an
Executor, it cannot pin threads (that is, try to keep work of the same task on the same thread to improve cache locality). This
Scheduler barely makes sure a single
Scheduler.Worker processes events sequentially.
Schedulers.immediate() is a special scheduler that invokes a task within the client thread in a blocking fashion, rather than asynchronously. Using it is pointless unless some part of your API requires providing a scheduler, whereas you are absolutely fine with default behavior of
Observable, not involving any threading at all. In fact, subscribing to an
Observable (more on that in a second) via
immediate() Scheduler typically has the same effect as not subscribing with any particular scheduler at all. In general, avoid this scheduler, it blocks the calling thread and is of limited use.
trampoline() scheduler is very similar to
immediate() because it also schedules tasks in the same thread, effectively blocking. However, as opposed to
immediate(), the upcoming task is executed when all previously scheduled tasks complete.
immediate() invokes a given task right away, whereas
trampoline() waits for the current task to finish. Trampoline is a pattern in functional programming that allows implementing recursion without infinitely growing the call stack.
The output is as expected; you could actually replace
schedule() with a simple method invocation:
Outer block we
schedule() Inner block that gets invoked immediately, interrupting the
Outer task. When
Inner is done, the control goes back to
Outer. Again, this is simply a convoluted way of invoking a task in a blocking manner indirectly via
immediate() Scheduler. But what happens if we replace
Schedulers.trampoline()? The output is quite different:
Do you see how
Outer manages to complete before
Inner even starts? This is because the
Inner task was queued inside the
Scheduler, which was already occupied by the
Outer task. When
Outer finished, the first task from the queue (
Inner) began. We can go even further to make sure you understand the difference:
The Worker from
Scheduler outputs the following:
Scheduler is used only for testing purposes, and you will never see it in production code. Its main advantage is the ability to arbitrarily advance the clock, simulating time passing by.
Scheduler not only decouples tasks and their execution (typically by running them in another thread), but it also abstracts away the clock, as we will learn in “Virtual Time”. The API of the
Scheduler is a bit simpler compared to, for example,
When RxJava wants to schedule a task (presumably, but not necessarily in the background), it must first ask for an instance of
Worker. It is the
Worker that allows scheduling the task without any delay or at some point in time. Both
Worker have an overridable source of time (
now() method) that it uses to determine when a given task is supposed to run. Naively, you can think of a
Scheduler like a thread pool and a
Worker like a thread inside that pool.
The separation between
Worker is necessary to easily implement some of the guidelines enforced by the Rx contract, namely invoking
Subscriber’s method sequentially, not concurrently.
Worker’s contract provides just that: two tasks scheduled on the same
Worker will never run concurrently. However, independent
Workers from the same
Scheduler can run tasks concurrently just fine.
Rather than going through the API, let’s analyze the source code of an existing
HandlerScheduler, as found in the RxAndroid project. This
Scheduler simply runs all scheduled tasks on an Android UI thread. Updating the user interface is only allowed from that thread,
Details of the Android API are not important at the moment. What happens here is that every time we schedule something on a
HandlerWorker, the block of code is passed to a special
postDelayed() method that executes it on a dedicated Android thread. There is just one such thread, so events are serialized not only within, but also across
Before we pass action to be executed, we wrap it with
ScheduledAction, which implements both
Subscription. RxJava is lazy whenever it can be — this also applies to scheduling tasks. If for any reason you decide that a given action should not be executed after all (this makes sense when the action was scheduled in the future, not immediately), simply run
unsubscribe() on the
Subscription returned from
schedule(). It is the responsibility of the
Worker to properly handle unsubscription (best effort at least).
Client code can also decide to
Worker in its entirety. This should unsubscribe all queued tasks as well as release the
Worker so that the underlying thread can potentially be reused later. The following code snippet enhances the
SimplifiedHandlerScheduler by adding
Worker unsubscription flow (only modified methods are included):
CompositeSubscription is one out of many implementations available that itself is just a container for child
Subscriptions (a Composite design pattern). Unsubscribing from
CompositeSubscription means unsubscribing from all children. You also can add and remove the children managed by
In our custom
CompositeSubscription is used to track all
Subscriptions from the previous
schedule() invocations (see
compositeSubscription.add(scheduledAction)). On the other hand, the child
ScheduledAction needs to know about its parent (see:
addParent()) so that it can remove itself when the action is completed or canceled. Otherwise, Worker would accumulate stale child
Subscriptions forever. When the client code decides that it no longer needs a
HandlerWorker instance, it unsubscribes from it. The unsubscription is propagated to all (if any) outstanding child
In “Mastering Observable.create()” we saw that
subscribe() by default uses the client thread. To recap, here is the most simple subscription that you can come up with where no threading was involved whatsoever:
Notice where the logging statements are placed and study the output carefully, especially with regard to which thread invoked the print statement:
Pay attention: the order of statements is absolutely predictable. First, every line of code in the preceding code snippet runs in the
main thread, there are no thread pools and no asynchronous emission of events involved. Second, the order of execution might not be entirely clear at first sight.
There is an inherent but hidden connection between
create(). Every time you call
subscribe() on an
OnSubscribe callback method is invoked (wrapping the lambda expression you passed to
create()). It receives your
Subscriber as an argument. By default, this happens in the same thread and is blocking, so whatever you do inside
create() will block
create() method sleeps for few seconds,
subscribe() will block. Moreover, if there are operators between
Observable.create() and your
Subscriber (lambda acting as callback), all these operators are invoked on behalf of the thread that invoked
subscribe(). RxJava does not inject any concurrency facilities by default between
Subscriber. The reason behind that is that
Observables tend to be backed by other concurrency mechanisms like event loops or custom threads, so Rx lets you take full control rather than imposing any convention.
This observation prepares the landscape for the
subscribeOn() operator. By inserting
subscribeOn() anywhere between an original
subscribe(), you declaratively select
Scheduler where the
OnSubscribe callback method will be invoked. No matter what you do inside
create(), this work is offloaded to an independent
Scheduler and your
subscribe() invocation no longer blocks:
Do you see how the
main thread exits before
Observable even begins emitting any values? Technically, the order of log messages is no longer that predictable because two threads are running concurrently: main, which subscribed and wants to exit, and
Sched-A-0, which emits events as soon as someone subscribed. The
schedulerA as well as
Sched-A-0 thread come from the following sample schedulers we built for illustration purposes:
These schedulers will be used across all examples, but they are fairly easy to remember. Three independent schedulers, each managing 10 threads from an
ExecutorService. To make the output nicer, each thread pool has a distinct naming pattern.
Before we begin, you must understand that in mature applications, in terms of Rx adoption,
subscribeOn() is very seldom used. Normally,
Observables come from sources that are naturally asynchronous (like RxNetty, see “Nonblocking HTTP Server with Netty and RxNetty”) or apply scheduling on their own (like Hystrix, see “Managing Failures with Hystrix”). You should treat
subscribeOn() only in special cases when the underlying
Observable is known to be synchronous (
create() being blocking). However,
subscribeOn() is still a much better solution than hand-crafted threading within
The preceding code mixes two concepts: producing events and choosing concurrency strategy.
Observable should be responsible only for production logic, whereas it is only the client code that can make judicious decision about concurrency. Remember that
Observable is lazy but also immutable, in the sense that
subscribeOn() affects only downstream subscribers, if someone subscribes to the exact same
subscribeOn() in between, no concurrency will be involved by default.
Keep in mind that in this chapter our focus is on existing applications and introducing RxJava gradually. The
subscribeOn() operator is quite useful in such circumstances; however, after you grasp reactive extensions and begin using them on large scale, the value of
subscribeOn() diminishes. In entirely reactive software stacks, as found for example at Netflix ,
subscribeOn() is almost never used, yet all
Observables are asynchronous. Most of the time
Observables come from asynchronous sources and they are treated as asynchronous by default. Therefore, using
subscribeOn() is very limited, mostly when retrofitting existing APIs or libraries.
There are several nuances regarding how
subscribeOn() works. First, curious reader should be wondering what happens if two invocations of the
subscribeOn() appear between
subscribe(). The answer is simple:
subscribeOn() closest to the original
This has important practical implications. If you are designing an API and you use
subscribeOn() internally, the client code has no way of overriding the
Scheduler of your choice. This can be a conscious design decision; after all, the API designer might know best which
Scheduler is appropriate. On the other hand, providing an overloaded version of said API that allows overriding the chosen
Scheduler is always a good idea.
The output reveals only
Interestingly, subscribing on
schedulerB is not entirely ignored in favor of
schedulerB is still used for a short period of time, but it barely schedules new action on
schedulerA, which does all the work. Thus, multiple
subscribeOn() are not only ignored, but also introduce small overhead.
Speaking of operators, we said that the
create() method used when there is a new
Subscriber is executed within the provided scheduler (if any). But which thread executes all these transformations happening between
subscribe()? We already know that when all operators are executed by default in the same thread (scheduler), no concurrency is involved by default:
We sprinkled the pipeline of operators occasionally with
doOnNext() to see which thread is in control at this point. Remember that the position of
subscribeOn() is not relevant, it can be right after
Observable or just before
subscribe(). The output is unsurprising:
create() is invoked and produces
B events. These events travel sequentially through the scheduler’s thread to finally reach the Subscriber. Many newcomers to RxJava believe that using a
Scheduler with a large number of threads will automatically fork processing of events concurrently and somehow join all the results together in the end. This is not the case. RxJava creates a single
Worker instance for the entire pipeline, mostly to guarantee sequential processing of events.
This means that if one of your operators is particularly slow—for example, map() reading data from disk in order to transform events passing by—this costly operation will be invoked within the same thread. A single broken operator can slow down the entire pipeline, from production to consumption. This is an antipattern in RxJava, operators should be nonblocking, fast, and as pure as possible.
flatMap() comes to the rescue. Rather than blocking within
map(), we can invoke
flatMap() and asynchronously collect all the results. Therefore,
merge() are the operators when we want to achieve true parallelism. But even with
flatMap() it is not obvious. Imagine a grocery store (let’s call it “RxGroceries”) that provides an API for purchasing goods:
Obviously, the implementation of
doPurchase() is irrelevant here, just imagine it takes some time and resources to complete. We simulate business logic by adding artificial sleep of one second, slightly higher if
quantity is bigger. Blocking Observables like the one returned from
purchase() are unusual in a real application, but let’s keep it this way for educational purposes. When purchasing several goods we would like to parallelize as much as possible and calculate total price for all goods in the end. The first attempt is fruitless:
The result is correct, it is an
Observable with just a single value: total price, calculated using
reduce(). For each product, we invoke
doPurchase() with quantity one. However, despite using
schedulerA backed by a thread pool of 10, the code is entirely sequential:
Notice how each product blocks subsequent ones from processing. When the purchase of bread is done, butter begins immediately, but not earlier. Strangely, even replacing
flatMap() does not help, and the output is exactly the same:
The code does not work concurrently because there is just a single flow of events, which by design must run sequentially. Otherwise, your Subscriber would need to be aware of concurrent notifications (
onComplete(), etc.), so it is a fair compromise. Luckily, the idiomatic solution is very close. The main
Observable emitting products cannot be parallelized. However, for each product, we create a new, independent
Observable as returned from
purchase(). Because they are independent, we can safely schedule each one of them concurrently:
Observable is not really doing anything, so a special thread pool is unnecessary. However each substream created within
flatMap() is supplied with a
schedulerA. Every time
subscribeOn() is used to the
Scheduler gets a chance to return a new
Worker, and therefore a separate thread (simplifying a bit):
Finally, we achieved true concurrency. Each purchase operation now begins at the same time and they all eventually finish. The
flatMap() operator is carefully designed and implemented so that it collects all events from all independent streams and pushes them downstream sequentially. However, as we already learned in “Order of Events After
flatMap()”, we can no longer rely on the order of downstream events—they neither begin nor complete in the same order as they were emitted (the original sequence began at bread). When events reach the
reduce() operator, they are already sequential and well behaving.
By now, you should slowly move away from the classic
Thread model and understand how
Schedulers work. But if you find it difficult, here is a simple analogy:
Schedulerworks like a single-threaded program with blocking method calls passing data between one another
Observablewith a single
subscribeOn()is like starting a big task in the background Thread. The program within that Thread is still sequential, but at least it runs in the background
flatMap()where each internal
java.util.concurrent, where each substream is a fork of execution and
flatMap()is a safe join stage
Of course, the preceding tips only apply to blocking
Observables, which are rarely seen in real applications. If your underlying
Observables are already asynchronous, achieving concurrency is a matter of understanding how they are combined and when subscription occurs. For example,
merge() on two streams will subscribe to both of them concurrently, whereas the
concat() operator waits until the first stream finishes before it subscribes to the second one.
Did you notice that
quantity even though the
quantity was always one? What if our grocery list had some products multiple times, indicating bigger demand? The first naive implementation simply sends the same request — for example, for egg, multiple times, each time asking for one. Fortunately, we can declaratively batch such requests by using
groupBy() — and this still works with declarative concurrency:
At this point, within
flatMap() we need to construct an
Observable of type
Pair<String, Integer> which represents every unique product and its quantity. Both
map() return an
Observable, so everything lines up perfectly. Second
flatMap() receives order of type
Pair<String, Integer> and makes a purchase, this time the quantity can be bigger. The output looks perfect; notice that bigger orders are slightly slower, but still it is much faster than having several repeated requests:
Believe it or not, concurrency in RxJava can be described by two operators: the aformentioned
observeOn(). They seem very similar and are confusing to newcomers, but their semantics are actually quite clear and reasonable.
subscribeOn() allows choosing which
Scheduler will be used to invoke
OnSubscribe (lambda expression inside
create()). Therefore, any code inside
create() is pushed to a different thread — for example, to avoid blocking the main thread. Conversely,
observeOn() controls which
Scheduler is used to invoke downstream
Subscribers occurring after
For example, calling
create() happens in the
subscribeOn(io())) to avoid blocking the user interface. However, updating the user interface widgets must happen in the UI thread (both Swing and Android have this constraint), so we use
observeOn() for example with
AndroidSchedulers.mainThread() before operators or subscribers changing UI. This way we can use one Scheduler to handle
create() and all operators up to the first
observeOn(), but other(s) to apply transformations. This is best explained with an example:
observeOn() occurs somewhere in the pipeline chain, and this time, as opposed to
subscribeOn(), the position of
observeOn() is quite important. No matter what
Scheduler was running operators above
observeOn() (if any), everything below uses the supplied
Scheduler. In this example, there is no
subscribeOn(), so the default is applied (no concurrency):
All of the operators above
observeOn are executed within client thread, which happens to be the default in RxJava. But below
observeOn(), the operators are executed within the supplied
Scheduler. This will become even more obvious when both
subscribeOn() and multiple
observeOn() occur within the pipeline:
Can you predict the output? Remember, everything below
observeOn() is run within the supplied Scheduler, of course until another
observeOn() is encountered. Additionally subscribeOn() can occur anywhere between
subscribe(), but this time it only affects operators down to the first
Subscription occurs in
schedulerA because that is what we specified in
subscribeOn(). Also “
Found 1“ operator was executed within that
Scheduler because it is before the first
observeOn(). Later, the situation becomes more interesting.
observeOn() switches current
schedulerB, and “
Found 2“ is using this one, instead. The last
observeOn(schedulerC) affects both “
Found 3“ operator as well as
Subscriber. Remember that
Subscriber works within the context of the last encountered
observeOn() work really well together when you want to physically decouple producer (
Observable.create()) and consumer (
Subscriber). By default, there is no such decoupling, and RxJava simply uses the same thread.
subscribeOn() only is not enough, we simply choose a different thread.
observeOn() is better, but then we block the client thread in case of synchronous
Observables. Because most of the operators are nonblocking and lambda expressions used inside them tend to be short and cheap, typically there is just one
observeOn() in the pipeline of operators.
subscribeOn() can be placed close to the original
Observable to improve readability, whereas
observeOn() is close to
subscribe() so that only
Subscriber uses that special
Scheduler, other operators rely on the
store() is a simple nested operation:
The production of events occurs in
schedulerA, but each event is processed independently using
schedulerB to improve concurrency, a technique we learned in “
subscribeOn() Concurrency and Behavior”. The subscription in the end happens in yet another
schedulerC. We are pretty sure you understand by now which Scheduler/thread will execute which action, but just in case (empty lines added for clarity):
observeOn() is especially important for applications with a UI for which we do not want to block the UI event-dispatching thread. On Android or Swing, some actions like updating the UI must be executed within a specific thread. But doing too much in that thread renders your UI unresponsive. In these cases, you put
observeOn() close to
subscribe() so that code within the subscription is invoked within the context of a particular
Scheduler (like UI-thread).
However, other transformations, even rather cheap, should be executed outside UI thread. On the server,
observeOn() is seldom used because the true source of concurrency is built into most
Observables. This leads to an interesting conclusion: RxJava controls concurrency with just two operators (
observeOn()), but the more you use reactive extensions, the less frequently you will see these in production code.
There are numerous operators that by default use some
Schedulers.computation() is used if none is supplied — JavaDoc always makes it clear. For example, the
delay() operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different
Without supplying a custom
schedulerA, all operators below
delay() would use the
Scheduler. There is nothing inherently wrong with that; however, if your
Subscriber is blocked on I/O it would consume one Worker from globally shared
computation() scheduler, possibly affecting the entire system.
Other important operators that support custom
timeout(), and several others that have yet to be introduced. If you do not provide a
scheduler to such operators, computation()
Scheduler is utilized, which is a safe default in most cases.
Mastering schedulers is essential to writing scalable and safe code using RxJava. The difference between
observeOn() is especially important under high load where every task must be executed precisely when we expect. In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus
Schedulers are needed. But there is always this one API or dependency that requires blocking code.
This chapter described several patterns in traditional applications that can be replaced with RxJava. I hope you understand by now that high-frequency trading or streaming posts from social media are not the only use cases for RxJava. As a matter of fact, almost any API can be seamlessly replaced with
Observable. Even if you don’t want or need the power of reactive extensions at the moment, it will allow you to evolve implementation without introducing backward-incompatible changes.
Moreover, it is the client that eventually harvests all the possibilities offered by RxJava, like laziness, declarative concurrency, or asynchronous chaining. Even better, because of seamless conversion from
BlockingObservable, traditional clients can consume your API as they want, and you can always provide a simple bridge layer.
You should be fairly confident with RxJava and understand the benefits of applying it even in legacy systems. Undoubtedly, working with reactive
Observables is more challenging and has a somewhat steep learning curve. But the advantages and possibilities of growth simply can’t be exaggerated. Imagine if we could write entire applications using reactive extensions, from top to bottom? Like a greenfield project for which we have control over every API, interface, and external system. Next chapter will discuss how you can write such an application and what the implications are.
(To Be Continued)