To make the best of reactive programming, we must dive a little bit deeper. This chapter focuses on a few nontrivial but important aspects and principles, among them:
- Declarative error handling, including retries (see “Error Handling”)
- Virtual time and testing (see “Virtual Time”)
- Monitoring and debugging of your Observable streams (see “Monitoring and Debugging”)
Understanding a library or framework is not enough to successfully deploy it to production. The aforementioned aspects are crucial if you want to build solid, stable, and resilient applications.
The Reactive Manifesto enumerates four traits that reactive systems should embrace. Such systems should be: responsive, resilient, elastic, and message driven. Let’s take a look at a couple of these:
The system responds in a timely manner if at all possible. […] responsiveness means that problems may be detected quickly and dealt with effectively. […] rapid and consistent response times, […] simplifies error handling.”
The system stays responsive in the face of failure. […] parts of the system can fail and recover without compromising the system as a whole. […] The client of a component is not burdened with handling its failures.
This section explains why the first two, responsiveness and resiliency, are important and how RxJava helps to achieve them. You are already familiar with
onError() callback when subscribing to an
Observable. But this is just the tip of the iceberg and often not the best approach to handle errors.
Traditionally in Java, errors are indicated by using exceptions. There are two flavors of exceptions in this language:
- Unchecked exceptions, which are not required in method declaration. If a method throws an unchecked exception (like
NullPointerException), it can indicate this in its declaration, but it is not obligatory
- Checked exceptions, which must be declared and handled in order for the code to compile. Basically, this is every
Throwablethat does not extend
There are pros and cons to both of these types of traditional error handling. Unchecked exceptions are easy to add and do not break compile-time backward compatibility. Also, with unchecked exceptions, client code seems cleaner because it does not need to deal with error handling (although it can).
Checked exceptions, on the other hand, are more explicit about the outcome that we can expect from a method. Of course, every method can throw other arbitrary types but checked exceptions are considered part of the API and suggest errors that must be handled explicitly.
Although checked exceptions are impossible to miss and might seem superior in terms of writing error-free code, they proved to be quite unwieldy and obscure. Even official Java APIs are migrating to unchecked exceptions, for example, old
JMSException (checked) versus new in JMS 2.0
RxJava takes an entirely different approach. First, in standard Java, exceptions are a new dimension in the type system. There is a method return type and there are exceptions that are completely orthogonal. When a method opens a File, it can either return
InputStream or throw
FileNotFoundException. But what if
FileNotFoundException is not declared? Or are there any other exceptions we should expect? Exceptions are like an alternative execution path, as if failures were always unexpected and never part of ordinary business flow.
In RxJava, failures are just another type of notification. Every
Observable<T> is a sequence of events of type
T optionally followed by completion or error notification. This means that errors are implicitly a part of every stream, and even though we are not required to handle them, there are plenty of operators that declaratively handle errors in a more elegant way. Also, an obtrusive
Observable will not capture any errors, they are only propagated through the aforementioned error notifications.
But, before we explore a handful of RxJava operators to declaratively handle errors, first we must understand the heuristics that apply when errors are not handled at all. In Java, exceptions can occur almost everywhere, and library creators must ensure they are appropriately handled or at least reported if not otherwise dealt with. The most common problem is a
subscribe() that does not define an
create(), we forcibly throw
ArithmeticException and invoke an
onError() callback on each
subscribe() does not provide
onError() implementation. Fortunately, RxJava tries to save the day by throwing
OnErrorNotImplementedException wrapping the original
ArithmeticException. But which thread throws this exception? This is a difficult question. If
Observable is synchronous (as in the preceding example), the client thread indirectly invokes
create() and thus throws
OnErrorNotImplementedException in case of unhandled
onError(). This means that the thread that invoked
subscribe() will receive
The situation becomes more complex if you forget to subscribe for errors and
Observable is asynchronous. In that case, the thread that invoked
subscribe() might be long gone when
OnErrorNotImplementedException is thrown. Under these circumstances, an exception is thrown from whichever thread was about to invoke
onError() callback. This can be a thread from
Scheduler selected via
subscribeOn() or the last
Scheduler is free to manage such an unexpected exception in any way it likes, most of the time it simply prints a stack trace to the standard error stream. This is far from perfect: such exceptions bypass your normal logging code, and in a worst-case scenario can go unnoticed. Therefore,
subscribe() only listening for values and not errors is often a bad sign and possibly missed errors. Even if you do not expect any exceptions to happen (which is rarely the case), at least place error logging that plugs into your logging framework:
There are many other places where exceptions can occur and sneak in. First of all, it is a good practice to surround a lambda expression within
create() with a
try-catch block, just like in the previous example:
However, if you forget about the
try-catch and let
create() throw an exception, RxJava does its best and propagates such an exception as an
The two preceding code examples are semantically equivalent. Exceptions thrown from
create() are caught internally by RxJava and translated to error notification. Yet, it is advised to explicitly propagate exceptions via
subscriber.onError() if possible. Even better, use
Other places where exceptions can generally occur are any operators that accept user code. In simpler words, any operator that takes a lambda expression as an argument, like
zip(), and many, many more. These operators should not only deal with error notifications coming from an upstream
Observable, but also with exceptions thrown from custom mapping functions or predicates. Take this broken mapping and filtering as an example:
The first example throws the familiar
ArithmeticException for some elements. The second example will lead to
NullPointerException while the
filter() predicate is invoked. All lambda expressions passed to higher-order functions like
filter() should be pure, whereas throwing an exception is an impure side effect.
RxJava again does its best to handle unexpected exceptions here and the behavior is exactly what you would expect. If any operator in the pipeline throws an exception, it is translated to error notification and passed downstream. Despite RxJava making an effort to fix broken user code, if you suspect your lambda expression to potentially throw an exception, make it explicit by using
flatMap() is a very versatile operator, it does not need to manifest the next step of asynchronous computation.
Observable is a container for values or errors, so if you want to declaratively express even very fast computation that can result in an error, wrapping it with
Observable is a good choice, as well.
Errors are very much like normal events flowing through our
Observable pipeline. We now understand where they come from, so we should learn how to handle them declaratively. The
Observables we program against most often are a combination of several operators and upstream
Observables. Take this simple example of constructing an insurance agreement based on some data:
This contrived example shows several steps of some business process: loading a
Person, looking up an available
Income based on
Person (concurrently forking execution), and then joining these two results to compute and translate
Score. Finally, the
InsuranceContract is joined with
Score (but only if it is high) and some post-processing like sending postal mail is performed. You know by now that no processing was performed so far; we barely declared operations to be invoked but until someone subscribes, no business logic is involved. But, what happens if any of these upstream sources result in error notification? There is no error handling visible here but errors are propagated quite conveniently.
All of the operators we’ve encountered so far worked primarily with values, entirely ignoring errors. This is fine: ordinary operators transform values flowing through but skip completion and error notifications, letting them flow downstream. This means that a single error from any upstream Observable will propagate with a cascading failure to all downstream subscribers. Again, this is fine if your business logic requires absolutely all steps to succeed. But sometimes you can safely ignore failures and replace them with fallback values or secondary sources.
The simplest error handling operator in RxJava is
onErrorReturn(): when encountered, an error simply replaces it with a fixed value:
onErrorReturn() operator probably goes without explanation. As long as normal events are flowing through, this operator does nothing. However, the moment it encounters an error notification from upstream, it immediately discards it and replaces it with a fixed value —
Income.no(), in this example.
onErrorReturn() is a fluent and very pleasant to read alternative to a
try-catch block that returns fixed result in the catch statement known from imperative style:
In this example, you might have noticed that this catch swallows the original exception and just returns a fixed value. This can be by design but it is generally a good idea to at least log an exception when it occurs. All error handling operators in RxJava behave this way — if you declaratively handle some exception, it will be swallowed. This is something you should definitely take into account; there is nothing worse than a malfunctioning system with a log file that does not reveal any issues.
onErrorReturn() passes error as an argument, which we happily ignore. You can either log an exception within
onErrorReturn() or use the more specialized diagnostics operators, which are covered in “Monitoring and Debugging”. For the time being, just remember that all error handling operators in RxJava leave exception logging and monitoring up to you.
Returning a fixed stub result with
onErrorReturn() might sometimes be a good approach, but more often than not you would actually like to lazily compute some fallback value in case of error. There are two possible scenarios here:
- The primary way of generating a stream of data failed (
onError()event, so we switch to a secondary source that is just as good, but for some reason we treat it as backup (slower, more expensive, and so on)
- In the presence of a failure, we would like to replace real data with some less expensive, more stable, maybe stale information. For example, when retrieval of fresh data fails we choose possibly a stale stream from cache. Another common example is delivering a slightly worse user experience; for example, returning a list of global best-sellers rather than personalized recommendations in an online shop
Clearly, the logic required when an error occurs can be expensive on its own and can lead to errors. Therefore, we must somehow encapsulate the fallback logic in a lazy, preferably asynchronous wrapper. What can it be? Of course: an
onErrorResumeNext() operator basically replaces error notification with another stream. If you subscribe to an an Observable guarded with
onErrorResumeNext() in case of failure, RxJava transparently switches from main
Observable to the fallback one, specified as an argument.
Interestingly, you can replace
onErrorResumeNext() with the
concatWith() operator, assuming
determineIncome always emits exactly one value or error:
There is something unfamiliar with the
flatMap() operator here: it accepts three lambda expressions instead of one:
- The first argument allows replacing each element from the upstream
Observable— this is exactly how
flatMap()was used so far throughout this book
- The second argument replaces the optional error notification with another stream. We want to ignore upstream errors so we simply switch to an empty
- Finally, when upstream completes normally, we can replace the completion notification with another stream
The usage of the
first() operator is crucial here. By applying the
first() operator, we wait only for the very first event to appear. In case of success, we get back a result of
determineIncome and RxJava never really subscribes to
guessIncome()’s result. But, in case of failure, the first
Observable essentially yields no events, so the
first() operator asks for another item, this time by subscribing to the fallback stream passed as an argument to
I hope you realized by now that
concatWith() is not needed at all in this example;
flatMap() in its most complex form is enough. Even the
first() operator is no longer needed. Think about it:
The preceding example has an interesting feature: we can return a different
onError() mapping based on th of type Throwable. So, theoretically we can return a different fallback stream based on the exception message or type. The
onErrorResumeNext() operator has an overloaded version that allows just that:
flatMap() is versatile enough to provide flexible error handling,
onErrorResumeNext() is more expressive and easier to read, so you should prefer it.
RxJava provides some operators to handle exception notifications from an upstream
Observable. But do you know what is even worse than an error? Silence. When a system you connect to fails with an exception, this is relatively simple to predict, handle, unit test, and so on. But what if you subscribe to an
Observable and it simply never emits anything, even though you expected to get a result almost immediately? This scenario is much worse than simply having an error. The latency of the system is greatly affected, and it appears as if it was hanging with no clear indication in the logs whatsoever.
Luckily, RxJava provides a built-in
timeout() operator that listens to the upstream
Observable, constantly monitoring how much time elapsed since the last event or subscription. If it so happens that the silence between consecutive events is longer than a given period, the
timeout() operator publishes an error notification that contains
To better understand how
timeout() works, first let’s consider an
Observable that emits only one event after a certain time. For the purposes of this demonstration, we will create an
Observable that returns some Confirmation event after 200 milliseconds. We simulate the latency by adding
delay(100, MILLISECONDS). Moreover, we would like to simulate additional latency between the event and completion notification. That is the purpose of
empty() Observable that normally just completes immediately but with the extra delay() it waits before sending a completion. Combining these two streams looks as follows:
Now, let’s test drive the
timeout() operator in its simplest overloaded version:
The 210-millisecond timeout is not a coincidence. The delay between subscription and arrival of
Confirmation instance is exactly 100 milliseconds, so less than the timeout threshold. Also, the delay between this event and completion notification is 200 milliseconds, also less than 210. Therefore, in this example, the
timeout() operator is transparent and does not influence the overall flow of messages. But decrease the
timeout() threshold to slightly less than 200 milliseconds (say, 190) and it becomes visible.
Confirmation is displayed but rather than a completion callback we receive an error notification holding
TimeoutException. The first event arrived considerably less than 200 milliseconds but the latency between the first event and the second one (completion notification actually) exceeded 190 milliseconds and instead an error notification was propagated downstream. Of course, if the timeout threshold is less than 100 milliseconds, you will not even see the first event.
This was the simplest use case for
timeout(); you’ll find it useful when you want to limit the time you wish to wait for a response or responses. However, sometimes a fixed timeout threshold is too strict and you would like to adjust timeouts at runtime.
Suppose that we built an algorithm for predicting the next solar eclipse. The interface of that algorithm is an
Observable<LocalDate> (of course!) which streams future dates of these kinds of events. Imagine for a second that this algorithm is really computationally intensive, which again we are going to simulate, this time by using the
interval() operator by zipping a fixed list of dates with a slowly progressing stream generated by
interval(). The first date available appears after 500 milliseconds, and every subsequent one after 50 milliseconds, thanks to
interval(500, 50, MILLISECONDS).
This is quite common in real-life systems: the initial element of the response has relatively high latency as a result of establishing the connection, SSL handshake, query optimization, or whatever the server is doing. But subsequent responses are either readily available or easily retrievable, so latency between them is much lower:
In these types of scenarios, having one fixed threshold is problematic. The first event should have a pessimistic limit, whereas subsequent limits should be much more aggressive. The overloaded version of
timeout() does just that: it accept two factories of
Observables, one marking the timeout of the first event, and the second one for each subsequent element. An example is worth a thousand words:
Here, the first
Observable emits exactly one event after one second—this is the acceptable latency threshold for the first event. The second
Observable is created for each event that appears on the stream and allows fine tuning of the timeout for the subsequent event. Notice that we do not use the
date parameter. You can imagine a timeout value that is adaptive in some sense; for example, we can wait a little bit more for the next event if the previous one was bigger than usual. Or, vice versa, each subsequent event has a lower timeout, adapting to our subscriber’s performance.
It is sometimes useful to also track the latency of each event, even if we do not timeout. The handy
timeInterval() operator does just that: it replaces each event of type
TimeInterval<T> that encapsulates the event but also shows how much time has elapsed since the previous event (or subscription in case of first event):
getValue() that returns
TimeInterval<LocalDate> also has
getIntervalInMilliseconds() but it is easier to see how it looks studying the output of the preceding program after subscription. You can clearly see that it took 533 milliseconds for the first event to arrive but only around 50 milliseconds for each one subsequently:
timeout() operator has yet another overloaded version that accepts the fallback
Observable replacing the original source in case of error. It is very similar in behavior to
onError notification is terminal; no other event can ever appear in such stream. Therefore, if you want to signal business conditions that are potentially nonfatal, avoid
onError. This is not much different from a common recommendation to avoid controlling the program flow by using exceptions. Instead, in
Observables consider wrapping errors in special types of events that can emerge multiple times next to ordinary events.
For example, if you are providing a stream of transaction results and some transactions can fail due to business reasons such as insufficient funds, do not use
onError notification for that. Instead, consider creating a
TransactionResult abstract class with two concrete subclasses, each representing either success or failure.
onError notification in such a stream signals that something is going terribly wrong, like a catastrophic failure preventing further emission of any event.
That being said,
onError can represent transient failures of external components or systems. Surprisingly, often simply retrying one more time can lead to success. Other systems might be experiencing a brief load spike, GC pause, or restart. Retrying is an essential mechanism in building robust and resilient applications. RxJava has first-class support for retry.
The simplest version of the
retry() operator resubscribes to a failed
OBservable hoping that it will keep producing normal events rather than failures. For educational purposes, we will create an
Observable that misbehaves severely:
In 90 percent of the cases, subscribing to
risky() ends with a
RuntimeException. If you somehow make it to the “OK” branch an artificial delay between zero and two seconds is injected. Such a risky operation will serve as a demonstration of
Remember that a slow system is generally indistinguishable from a broken one, but often it is even worse because we experience additional latency. Having timeouts, sometimes even aggressive ones with a retry mechanism is desirable—of course, as long as retrying has no side effects or the operation is idempotent.
The behavior of
retry() is fairly straightforward: it pushes all events and completion notification downstream, but not
onError(). The error notification is swallowed (so no exception is logged whatsoever), thus we use
doOnError() callback. Every time
retry() encounters a simulated
TimeoutException, it tries subscribing again.
A word of caution here: if your
Observable is cached or otherwise guaranteed to always return the same sequence of elements,
retry() will not work:
risky() emits errors once, it will continue emitting them forever, no matter how many times you resubscribe. To overcome this issue, you can delay the creation of
Observable even further by using
Even if an
Observable returned from
risky() is cached,
risky() multiple times, possibly getting a new
Observable each time.
retry() method is useful, but blindly resubscribing with no throttling or limiting attempts is dangerous. We can quickly saturate the CPU or network, generating a lot of load. Basically, parameterless
retry() is a while loop with a try block within it, followed by an empty catch. First, we should limit the number of attempts, which happens to be built in:
The integer parameter to
retry() instructs how many times to resubscribe, thus
retry(0) is equivalent to no retry at all. If the upstream
Observable failed for the tenth time, the last seen exception is propagated downstream. A more flexible version of
retry() leaves you with a decision about retry, based on the attempt number and the actual exception:
This version not only limits the number of resubscription attempts to 10, but also drops retrying prematurely if the exception happens to be
If failures are transient, waiting a little bit prior to a resubscription attempt sounds like a good idea. The
retry() operator does not provide such a possibility out of the box, but it is relatively easy to implement. A more robust version of
retryWhen() takes a function receiving an
Observable of failures. Every time an upstream fails, this
Observable emits a
Throwable. Our responsibility is to transform this
Observable in such a way that it emits some arbitrary event when we want to retry (hence the name):
The preceding example of
retryWhen() receives an
Observable that emits a
Throwable every time the upstream fails. We simply delay that event by one second so that it appears in the resulting stream one second later. This is a signal to
retryWhen() that it should attempt retry. If we simply returned the same stream (
retryWhen(x -> x)),
retryWhen() would behave exactly like
retry(), resubscribing immediately when an error occurs. With
retryWhen(), we can also easily simulate
retry(10) (well, almost… keep reading):
We receive an event each time a failure occurs. The stream we return is supposed to emit an arbitrary event when we want to retry. Thus, we simply forward the first 10 failures, causing each one of them to be retried immediately. But what happens when eleventh failure occurs in a
failures Observable? This is where it becomes tricky.
take(10) operator emits an onComplete event immediately following the 10th failure. Therefore, after the 10th retry,
retryWhen() receives a completion event. This completion event is interpreted as a signal to stop retrying and complete downstream. It means that after 10 failed attempts, we simply emit nothing and complete. However, if we complete Observable returned inside
retryWhen() with an error, this error will be propagated downstream.
In other words, as long as we emit any event from an
retryWhen(), they are interpreted as retry requests. However, if we send a completion or error notification, retry is abandoned and this completion or error is passed downstream. Doing just
failures.take(10) will retry 10 times, but in case of yet another failure, we do not propagate the last error but the successful completion, instead. Let’s have a look at it:
This looks quite complex, but it is also really powerful. We
zip failures with sequence numbers from 1 to 11. We would like to perform as many as 10 retry attempts, so if the attempt sequence number is smaller than 11, we return
timer(1, SECONDS). The
retryWhen() operator will capture this event and retry one second after failure. However, when the 10th retry ends with a failure, we return an
Observable with that error, completing the retry mechanism with the last seen exception.
This gives us a lot of flexibility. We can stop retrying when a certain exception appears or when too many attempts were already performed. Moreover, we can adjust the delay time between attempts! For example, the first retry can appear immediately but the delays between subsequent retries should grow exponentially:
On the first retry attempt, we return an
Observable emitting an arbitrary event immediately, so that retry happens right away. It makes no difference what type and value of event we return (only the moment counts), so
42 is as good as any other value. On the last retry attempt, we forward an exception to the downstream
Subscriber containing the last seen failure reason. Finally, for attempts 2 through 10, we calculate the delay using the following exponential formula.
Stream composition, especially involving time, can become difficult. Happily, RxJava has great support for unit testing. You can use a
TestSubscriber to assert emitted events, but more importantly, RxJava has a concept of virtual time. In essence, we have full control over the elapsing of time so that tests relying on time are both fast and predictable.
Time is an important factor in almost any application we deal with, and we are not talking about latency and response times here. Everything happens at some point in time, the order of events is important, jobs are scheduled in the future. Therefore, we spend countless hours looking for bugs occurring only at certain dates or timezones.
There does not seem to be any established way of testing time-related code. One of the practices, known as property-based testing, aims at generating hundreds of test cases (sometimes randomized) to test a wide spectrum of input arguments. For example, let’s validate a very simple property: for any given date, adding and subsequently subtracting one month gives back the same date:
We used the Spock framework in Groovy language to rapidly generate 366 different test cases. The code in the
expect block is executed for each value generated in the
where block. In the
where block, we iterate over integers from 0 to 365 and generate all possible dates beginning on
2016-12-31. The assertion is fairly obvious and straightforward: if we add and then subtract one month for pretty much any date we should get that date back. Yet 6 out of 366 test cases fail:
The reason to show this contrived example is to make you realize how complex the time domain is. But the peculiarities of the calendar are not the root cause of the headaches we have when dealing with time in computer systems. RxJava tries to tackle the complexity of concurrency by avoiding state and using pure functions as often as possible.
Being pure means that a function (or operator) should explicitly declare all inputs and output. This makes testing much easier. However, the dependency on time is almost always hidden and concealed. Every time you see
System.currentTimeMillis(), and many others, you are depending on an external value that changes… well, over time. We know depending on singletons is bad for your design, especially from a testability point of view. But, reading current time is effectively relying on a system-wide singleton available everywhere.
One of the patterns to make dependency on time more explicit involves a fake system clock. This pattern requires all programmers to be very rigorous and delegate time-related code to a special service that can be mocked. Java 8 formalizes this method by introducing the
Clock abstraction, which boils down to the following:
Interestingly, RxJava has a very similar abstraction that we already explored in great detail:
Schedulers. How are
Schedulers related to the passage of time, you might ask? Well, everything that happens in RxJava either happens immediately or is scheduled in some time in the future. It is the Scheduler that has full control over when to execute every single line of code in RxJava.
computation() have no special capabilities apart from running tasks at given points in time. However, there is one special
Scheduler that has two intriguing methods:
advanceTimeTo(). These methods of
TestScheduler are capable of advancing the time manually; otherwise, it’s frozen forever. This means that no tasks scheduled in the future on this
Scheduler are ever executed until we manually advance time whenever we find it useful.
As an example, let’s look at a sequence of events appearing over time:
When subscribed, we should see three events
F2, each preceded with 10 ms delay, followed by an infinite number of
S1… events, each after 50 ms delay. How can we test that we combined all these streams together, that events appear in the correct order and, more importantly, at the correct time? The key is the explicit
TestScheduler that we passed wherever it was possible:
The output you can expect is absolutely predictable and repeatable, entirely independent from system time, and experiences transient load spikes, GC pauses, and so on:
Here is what happens:
- After we subscribed to stream
Observable, it began by scheduling
F0task 10 ms in the future. However, it used
TestSchedulerthat sits absolutely idle unless we manually advance time
- Sleeping one second is actually irrelevant and could be omitted,
TestScheduleris independent from system time, thus no events are emitted at all. Sleeping here is only to prove that
TestSchedulerworks. If this were not a
TestSchedulerbut an ordinary (default) one, you could expect several events to appear on the console by now
advanceTimeBy(25ms)forces everything that was scheduled up to 25th millisecond to be triggered and executed. This causes events F0 (10th ms) and F1 (20th ms) to appear on the console
- Sleeping another second does nothing to the output;
TestSchedulerignores real time. However, calling
advanceTimeBy(75ms)(so the logical time is now 100th ms) further triggers
F2(30th ms) and
S0(80th ms). Nothing more happens
- After one more second of real time elapsed, we advance time to absolute value of 200 ms (
advanceTimeBy()uses relative time).
S1(130th ms) and
S2(180th ms) should have been triggered by that time. But no other event is triggered, even if we wait for eternity
As you can see,
TestScheduler is actually much more clever than an ordinary fake
Clock abstraction. Not only do we have full control over current time, but we can also arbitrarily postpone all events. One caveat is that you must pass
TestScheduler everywhere, basically to every operator that has an optional
Scheduler argument. For your convenience, all such operators use a default
Scheduler, but from a testability point of view, you should prefer passing an explicit
Scheduler. Moreover, consider dependency injection and provide
Schedulers from the outside.
TestScheduler alone is not enough. It works very well in unit tests for which predictability is a must and flickering tests failing sporadically are quite frustrating. Chapter 8 explores tools and techniques that enable unit testing of inherently asynchronous
Writing testable code and having a solid suite of tests has been a necessity for a long time, not a novel approach. Whether you write tests first in test-driven development (TDD) spirit or hack around and confirm your assumptions with few integration tests later, automated testing is something you should be comfortable with. Therefore, the tools you use (frameworks, libraries, platforms) must support automated tests, and this ability should be one of the aspect when making technology decisions.
Fear no more, RxJava has excellent support for unit testing, despite a quite complex domain of asynchronous, event-driven architecture. Explicitness of time, combined with focus on pure functions and function composition (well grounded in functional programming) greatly improve the testing experience.
First, we need to define our goals for testing
Observables. Having a method returning an
Observable we probably want to make sure of the following:
- Events are emitted in correct order
- Errors are properly propagated
- Various operators are composed together as predicted
- Events appear at right moments in time
- Backpressure is supported
And much more. The first two requirements are simple and do not require any special support from RxJava. Basically, collect everything that was emitted and execute assertions using whichever library you prefer:
The preceding simple test case transforms an Observable
List<Integer> by using the well-known
single() construct. Normally, an Observable is asynchronous, so to have predictable and fast tests, we must perform such transformation. We can also easily assert
onError() notifications when
BlockingObservable is used. Exceptions are simply rethrown upon subscription. Notice that checked exceptions are wrapped with
RuntimeExceptions — something only a good test can prove:
fromCallable() operator is handy when you want to lazily create an
Observable that emits at most one element. It also handles error handling and backpressure, so you should prefer it over
Observable.create() for one-element streams. You can use another type of unit test to prove our understanding of various operators and their behavior. For example, what does the
concatMapDelayError() operator actually do? You are free to try it once, but having an automated test that everyone can read and quickly grasp is a great advantage:
With the standard
concatMap(), the transformation of the second element (
0) would fail and terminate the entire stream. However, we clearly see that our final stream has four elements: three
OnNext notifications followed by
OnError. Another assertion could actually show that indeed the final values are 33 (100 / 3), 50, and 100. This nicely explains how
concatMapDelayError() works — if any error is generated from transformation, it is not passed downstream but the operator continues. Only when the upstream source completes, we instead pass
onError notification that we found along the way.
In this last test case, we could no longer convert Observable to List because it would throw an exception immediately.
materialize() is useful in such cases: each kind of event (
onError) is wrapped in a homogeneous
Notification object. These objects can later be examined, but this is tedious and not very readable. This is where
TestSubscriber becomes handy:
TestSubscriber class is quite simple: it stores all events and notifications it received internally so that we can later query it.
TestSubscriber also provides a set of assertions that are very useful in a test case. All we need to do is create an instance of
TestSubscriber, subscribe to an
Observable-under-test, and examine the contents of it. Strangely, the preceding test actually fails.
assertError() fails because we expect the stream to complete with
ArithmeticException, whereas in reality we got
CompositeException that aggregates all three
ArithmeticExceptions found along the way. This is yet another reason why discovering operators by running them and testing automatically is quite useful.
TestSubscriber is extremely effective when working hand in hand with
TestScheduler. A typical scenario involves interleaving assertions and advancing time to observe how events are flowing over time. Imagine that you have a service that returns an
Observable. The details of its implementation are entirely irrelevant:
Rather than mixing different concerns, we decided to build a decorator over
MyService that adds timeout functionality to whatever the underlying implementation of
MyService is. For the reasons that you can probably guess by now, we also go the extra mile of externalizing the
Scheduler used by the
The RxJava approach involves a synthetic, controlled clock that is entirely predictable. 100 percent accurate but also extremely fast tests are achieved by artificially advancing time. First, we setup a mock of
MyService (using Mockito) that can return any
We will now write two unit tests. The first ensures that in case of an
externalCall() that never finishes, we receive a timeout precisely after one second:
never() operator returns an
Observable that never completes and never emits any value. This simulates
MyService’s call that is painfully slow. Then, we make a sequence of two assertions. First, we advance time just before the timeout threshold (950 milliseconds) and make sure that the
TestSubscriber did not yet complete or fail. After 100 more milliseconds — that is, after the timeout threshold — we assert that the stream completed (
assertCompleted()) with no values (
assertNoValues()). We can also take advantage of
The second test should ensure that the timeout does not kick in before the configured threshold:
advanceTimeBy() is equivalent to sleeping in test, waiting for some action to happen, but without actually sleeping. You can test all sorts of operators like
sample(), and so on, as long as you meticulously allow passing a custom
Scheduler. Speaking of schedulers, it is tempting to use
Schedulers.immediate() as opposed to standard ones. This
Scheduler avoids concurrency by invoking all actions in the context of the caller thread. Such an approach works in some scenarios, but in general you should prefer
TestScheduler because its use cases are far more versatile.
Following the dependency injection principle is very important. Otherwise, you will not be able to replace various
Schedulers with test one. There are some techniques that can help you; for example, the
RxJavaSchedulersHook plug-in. RxJava has a set of plug-ins that can globally alter the behavior of the library.
RxJavaSchedulersHook, for example, can override the standard
computation() Scheduler (and other) with test one:
This global approach has many shortcomings. You can register only
RxJavaSchedulersHook once across all of JVM, so invoking this
@Before method for the second time fails. You can work around this, but it becomes increasingly complex. Also running unit tests in parallel (normally, unit tests are independent from one another so it should not be an issue) becomes impossible. Therefore, the only scalable solution for controlling time is explicitly passing
TestScheduler whenever possible.
The one last thing that you can exercise with
TestSubscriber is backpressure. In “Honoring the Requested Amount of Data”, we examined two implementations of an infinite
Observable that produces subsequent natural numbers. One was using an old-fashioned raw
Observable.create(), which does not support backpressure:
The more advanced but recommended implementation fully supports backpressure:
From a functionality point of view, these two are the same, both are infinite but you can, for example, just take only a selected subset. However, with
TestSubscriber we can easily unit-test whether a given
Observable also supports backpressure:
The crucial part of this example is the
TestSubscriber<>(0) constructor. Without it,
TestSubscriber simply receives everything at the velocity dictated by the source. But, if we request no data prior to subscription,
TestSubscriber does not request any data from an
Observable. This is the reason why we see
assertNoValues() despite the source
Observable clearly emitting 10 values. Later, we request as much as 100 items (just to be safe) but obviously the source
Observable emits only 10 — as many as it can possibly produce. This test fails for naturals1 almost immediately, and the following message appears:
Observable, of course, knows to stop emitting events after receiving 10, despite being infinite. The
take(10) operator eagerly unsubscribes ending the internal while loop. However,
naturals1 ignores the backpressure requests issued by
TestSubscriber, the latter receives items it never wanted. If you replace source with naturals2, now the test passes. This is another reason to avoid plain
Observable.create() in favor of the built-in factories and
TestSubscriber has many other assertions. Some of them block waiting for completion; for example,
awaitTerminalEvent(). Most of them, however, assert the state of the subscriber at the current moment, so that we can observe events flowing over time.
Monitoring the behavior of various streams interacting with one another and troubleshooting when issues arise is a difficult subject in RxJava. As a matter of fact, every asynchronous event-driven system is inherently more difficult to troubleshoot compared to blocking architectures. When a synchronous operation fails, the exception flows all the way up the call stack, exposing the exact sequence of operations that caused it, from HTTP server, through all filters, aspects, business logic, and so on.
In an asynchronous system, the call stack is of limited use because when an event crosses the thread boundary, we no longer have the original call stack available. The same applies to distributed systems. This section gives you few tips on how to make monitoring and debugging easier in applications using RxJava.
Observable has a set of callback methods that you can use to peek into various events, namely:
What they all have in common is that they are not allowed to alter the state of an
Observable in any way and they all return the same
Observable, which makes them an ideal place to sprinkle some logging logic. For example, many newcomers forget that the code within
Observable.create() is executed for each new
Subscriber. This is important especially when a subscription triggers side effects like a network call. To detect such problems, it is a good practice to log every subscription to critical sources:
The preceding program queries the database (
dbQuery()) and retrieves some time series data in the form of
Observable<Instant>. We would like to transform this stream a little bit by calculating the duration (using
Duration class from the
java.time package) between each subsequent pairs of
Instants: first and second, second and third, and so on. One way to do this is to
zip() stream with itself shifted by one element. This way we tie together the first with the second element, the second with the third, up to the end.
What we did not anticipate is that
zipWith() actually subscribes to all of the underlying streams, effectively subscribing to the same timestamps
Observable twice. This is a problem that you can discover by observing
doOnSubscribe() is being invoked twice. This leads to duplicated database query.
zip(), thanks to backpressure it no longer buffers faster stream infinitely, waiting for a slower one to emit events. Instead, it asks for a fixed batch of values from each
MissingBackpressureException if it received more:
doOnRequest() logs Requested 128, the value chosen by
zip operator. Even when the source is infinite or very large, we should see at most 128 messages such as
Got: ... afterward from a well-behaving
doOnNext() is another callback that we can take advantage of. Another useful operator that you should use fairly often is
doOnError(). It invokes callback every time an error notification flows from upstream. You cannot use
doOnError() for any error handling; it is for logging only. It does not consume the error notification, which keeps propagating downstream:
As clean as
onErrorReturn() looks, it is very easy to swallow exceptions with it. It does provide the exception that we want to replace with a fallback value, but logging it is our responsibility. To keep functions small and composable, logging the error first in
doOnError() and then handling the exception in the following line silently is a little bit more robust. Forgetting to log the exception is rarely a good idea and must be a careful decision, not an oversight.
Other operators are rather self-explanatory, with the possible exception of this pair:
This is invoked for each
onError(). It can accept either a lambda invoked for each
Notofication or an
This is invoked when either
onError() occurs. It is impossible to distinguish between them, so it might be better to use
Callbacks are not only useful for logging. Having various telemetric probes built into your application (like simple counters, timers, distribution histograms, and so on) and available externally can greatly reduce troubleshooting time as well as give great insight into what an application is doing. There are many libraries that simplify collecting and publishing metrics, one of them being Dropwizard metrics. Before you begin using this library, you need to do a little bit of setup:
MetricRegistry is a factory for various metrics. Additionally, we set up a
Slf4jReporter that will push a current snapshot of statistics to a given SLF4J logger. Other reporters publishing to Graphite and Ganglia are available. Having this basic setup you can being monitoring your streams.
One of the simplest metrics you can think of is a simple
Counter that can be incremented or decremented. You can use it to measure the number of events that flew through the stream:
After you subscribe to this
Counter will being showing how many items were generated so far. This information becomes even more useful when you publish it to an external monitoring server like Graphite and put it on a chart over time.
Another important metric that you might want to capture is how many items are being concurrently processed right at that moment. For example,
flatMap() can easily spin hundreds and more concurrent
Observables and subscribe to all of them. Knowing how many such
Observables we have (think about open database connections, web sockets, and so on) can give significant insight into the system:
When an event appears from upstream, we increment the counter. When an event appears after
flatMap() (which means one of the asynchronous operations just emitted something), we decrement it. In an idle system, the counter is always zero, but when an upstream observable produces a lot of events and
makeNetworkCall() is relatively slow, this counter skyrockets, clearly indicating where the bottleneck is.
The preceding example assumes that
makeNetworkCall() always returns just one item and never fails (never completes with
onError()). If instead you want to measure the time between subscription to the internal
Observable (when the work actually began) and its completion, it is equally straightforward:
One of the most complex metrics is
Timer, which measures the duration between two points in time. I cannot overstate the value of such a metric — we can measure network call latency, database query time, user response time, and much more. The way we typically measure time is by taking a snapshot of the current time, doing some lengthy operation, and then noting the difference between the time now and then. This is encapsulated in the Metrics library like this:
The API keeps the operation start time encapsulated in
Timer.Context and assumes that the code we are benchmarking is blocking. But what if we want to measure the time between subscription to an
Observable for which we have no control and its termination?
doOnTerminate() are insufficient here because we cannot pass
Timer.Context between them. Luckily, RxJava is flexible enough to tackle this problem anyway by one extra layer of composition:
We use a little trick. First, we lazily start time with a help of the
defer() operator. This way, the timer starts exactly when subscription happens. Later, we (in a way) replace the
Timer.Context instance with the actual
Observable that we want to benchmark (external). However, before we return external
Observable, we stop our running timer. You can use this technique to measure the time between subscription and termination of any
Observable over which you have no control.
Every reactive library or framework, due to their asynchronous and event-driven nature, is challenging when it comes to debugging and troubleshooting. RxJava is no exception here, but it provides a handful of tools that make developers’ and operations’ life easier.
- First, RxJava embraces errors and make it easy to handle and manage
- Secondly, it provides facilities for monitoring and debugging streams in real-time
- Finally, it has excellent unit-testing support
As a matter of fact, being able to take full control over the system clock is immensely useful for time-sensitive operators. RxJava can be difficult to troubleshoot at first. Yet it provides a clear API and strict contract as opposed to superficially simpler blocking code that has hidden race conditions and poor throughput.