Reactive Programming with RxJava: Testing and Troubleshooting

To make the best of reactive programming, we must dive a little bit deeper. This chapter focuses on a few nontrivial but important aspects and principles, among them:

  • Declarative error handling, including retries (see “Error Handling”)
  • Virtual time and testing (see “Virtual Time”)
  • Monitoring and debugging of your Observable streams (see “Monitoring and Debugging”)

Understanding a library or framework is not enough to successfully deploy it to production. The aforementioned aspects are crucial if you want to build solid, stable, and resilient applications.

Error Handling

The Reactive Manifesto enumerates four traits that reactive systems should embrace. Such systems should be: responsive, resilient, elastic, and message driven. Let’s take a look at a couple of these:

Responsive

The system responds in a timely manner if at all possible. […] responsiveness means that problems may be detected quickly and dealt with effectively. […] rapid and consistent response times, […] simplifies error handling.”

Resilient

The system stays responsive in the face of failure. […] parts of the system can fail and recover without compromising the system as a whole. […] The client of a component is not burdened with handling its failures.

This section explains why the first two, responsiveness and resiliency, are important and how RxJava helps to achieve them. You are already familiar with onError() callback when subscribing to an Observable. But this is just the tip of the iceberg and often not the best approach to handle errors.

Where Are My Exceptions?

Traditionally in Java, errors are indicated by using exceptions. There are two flavors of exceptions in this language:

  • Unchecked exceptions, which are not required in method declaration. If a method throws an unchecked exception (like NullPointerException), it can indicate this in its declaration, but it is not obligatory
  • Checked exceptions, which must be declared and handled in order for the code to compile. Basically, this is every Throwable that does not extend RuntimeException or Error. Example: IOException

There are pros and cons to both of these types of traditional error handling. Unchecked exceptions are easy to add and do not break compile-time backward compatibility. Also, with unchecked exceptions, client code seems cleaner because it does not need to deal with error handling (although it can).

Checked exceptions, on the other hand, are more explicit about the outcome that we can expect from a method. Of course, every method can throw other arbitrary types but checked exceptions are considered part of the API and suggest errors that must be handled explicitly.

Although checked exceptions are impossible to miss and might seem superior in terms of writing error-free code, they proved to be quite unwieldy and obscure. Even official Java APIs are migrating to unchecked exceptions, for example, old JMSException (checked) versus new in JMS 2.0 JMSRuntimeException (unchecked).

RxJava takes an entirely different approach. First, in standard Java, exceptions are a new dimension in the type system. There is a method return type and there are exceptions that are completely orthogonal. When a method opens a File, it can either return InputStream or throw FileNotFoundException. But what if FileNotFoundException is not declared? Or are there any other exceptions we should expect? Exceptions are like an alternative execution path, as if failures were always unexpected and never part of ordinary business flow.

In RxJava, failures are just another type of notification. Every Observable<T> is a sequence of events of type T optionally followed by completion or error notification. This means that errors are implicitly a part of every stream, and even though we are not required to handle them, there are plenty of operators that declaratively handle errors in a more elegant way. Also, an obtrusive try-catch around Observable will not capture any errors, they are only propagated through the aforementioned error notifications.

But, before we explore a handful of RxJava operators to declaratively handle errors, first we must understand the heuristics that apply when errors are not handled at all. In Java, exceptions can occur almost everywhere, and library creators must ensure they are appropriately handled or at least reported if not otherwise dealt with. The most common problem is a subscribe() that does not define an onError callback:

1
2
3
4
5
6
7
8
9
10
Observable
.create(subscriber -> {
try {
subscriber.onNext(1 / 0);
} catch (Exception e) {
subscriber.onError(e);
}
})
//BROKEN, missing onError() callback
.subscribe(System.out::println);

Within create(), we forcibly throw ArithmeticException and invoke an onError() callback on each Subscriber. Unfortunately, subscribe() does not provide onError() implementation. Fortunately, RxJava tries to save the day by throwing OnErrorNotImplementedException wrapping the original ArithmeticException. But which thread throws this exception? This is a difficult question. If Observable is synchronous (as in the preceding example), the client thread indirectly invokes create() and thus throws OnErrorNotImplementedException in case of unhandled onError(). This means that the thread that invoked subscribe() will receive OnErrorNotImplementedException.

The situation becomes more complex if you forget to subscribe for errors and Observable is asynchronous. In that case, the thread that invoked subscribe() might be long gone when OnErrorNotImplementedException is thrown. Under these circumstances, an exception is thrown from whichever thread was about to invoke onError() callback. This can be a thread from Scheduler selected via subscribeOn() or the last observeOn().

Scheduler is free to manage such an unexpected exception in any way it likes, most of the time it simply prints a stack trace to the standard error stream. This is far from perfect: such exceptions bypass your normal logging code, and in a worst-case scenario can go unnoticed. Therefore, subscribe() only listening for values and not errors is often a bad sign and possibly missed errors. Even if you do not expect any exceptions to happen (which is rarely the case), at least place error logging that plugs into your logging framework:

1
2
3
4
5
6
7
private static final Logger log = LoggerFactory.getLogger(My.class);
//....
.subscribe(
System.out::println,
throwable -> log.error("That escalated quickly", throwable));

There are many other places where exceptions can occur and sneak in. First of all, it is a good practice to surround a lambda expression within create() with a try-catch block, just like in the previous example:

1
2
3
4
5
6
7
Observable.create(subscriber -> {
try {
subscriber.onNext(1 / 0);
} catch (Exception e) {
subscriber.onError(e);
}
});

However, if you forget about the try-catch and let create() throw an exception, RxJava does its best and propagates such an exception as an onError() notification:

1
Observable.create(subscriber -> subscriber.onNext(1 / 0));

The two preceding code examples are semantically equivalent. Exceptions thrown from create() are caught internally by RxJava and translated to error notification. Yet, it is advised to explicitly propagate exceptions via subscriber.onError() if possible. Even better, use fromCallable():

1
Observable.fromCallable(() -> 1 / 0);

Other places where exceptions can generally occur are any operators that accept user code. In simpler words, any operator that takes a lambda expression as an argument, like map(), filter(), zip(), and many, many more. These operators should not only deal with error notifications coming from an upstream Observable, but also with exceptions thrown from custom mapping functions or predicates. Take this broken mapping and filtering as an example:

1
2
3
4
5
6
7
Observable
.just(1, 0)
.map(x -> 10 / x);
Observable
.just("Lorem", null, "ipsum")
.filter(String::isEmpty);

The first example throws the familiar ArithmeticException for some elements. The second example will lead to NullPointerException while the filter() predicate is invoked. All lambda expressions passed to higher-order functions like map() or filter() should be pure, whereas throwing an exception is an impure side effect.

RxJava again does its best to handle unexpected exceptions here and the behavior is exactly what you would expect. If any operator in the pipeline throws an exception, it is translated to error notification and passed downstream. Despite RxJava making an effort to fix broken user code, if you suspect your lambda expression to potentially throw an exception, make it explicit by using flatMap():

1
2
3
4
5
6
Observable
.just(1, 0)
.flatMap(x -> (x == 0) ?
Observable.error(new ArithmeticException("Zero :-(")) :
Observable.just(10 / x)
);

flatMap() is a very versatile operator, it does not need to manifest the next step of asynchronous computation. Observable is a container for values or errors, so if you want to declaratively express even very fast computation that can result in an error, wrapping it with Observable is a good choice, as well.

Declarative try-catch Replacement

Errors are very much like normal events flowing through our Observable pipeline. We now understand where they come from, so we should learn how to handle them declaratively. The Observables we program against most often are a combination of several operators and upstream Observables. Take this simple example of constructing an insurance agreement based on some data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable<Person> person = //...
Observable<InsuranceContract> insurance = //...
Observable<Health> health = person.flatMap(this::checkHealth);
Observable<Income> income = person.flatMap(this::determineIncome);
Observable<Score> score = Observable
.zip(health, income, (h, i) -> asses(h, i))
.map(this::translate);
Observable<Agreement> agreement = Observable.zip(
insurance,
score.filter(Score::isHigh),
this::prepare);
Observable<TrackingId> mail = agreement
.filter(Agreement::postalMailRequired)
.flatMap(this::print)
.flatMap(printHouse::deliver);

This contrived example shows several steps of some business process: loading a Person, looking up an available InsuranceContract, determining Health and Income based on Person (concurrently forking execution), and then joining these two results to compute and translate Score. Finally, the InsuranceContract is joined with Score (but only if it is high) and some post-processing like sending postal mail is performed. You know by now that no processing was performed so far; we barely declared operations to be invoked but until someone subscribes, no business logic is involved. But, what happens if any of these upstream sources result in error notification? There is no error handling visible here but errors are propagated quite conveniently.

All of the operators we’ve encountered so far worked primarily with values, entirely ignoring errors. This is fine: ordinary operators transform values flowing through but skip completion and error notifications, letting them flow downstream. This means that a single error from any upstream Observable will propagate with a cascading failure to all downstream subscribers. Again, this is fine if your business logic requires absolutely all steps to succeed. But sometimes you can safely ignore failures and replace them with fallback values or secondary sources.

REPLACING ERRORS WITH A FIXED RESULT USING ONERRORRETURN()

The simplest error handling operator in RxJava is onErrorReturn(): when encountered, an error simply replaces it with a fixed value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable<Income> income = person
.flatMap(this::determineIncome)
.onErrorReturn(error -> Income.no())
//...
private Observable<Income> determineIncome(Person person) {
return Observable.error(new RuntimeException("Foo"));
}
class Income {
static Income no() {
return new Income(0);
}
}

The onErrorReturn() operator probably goes without explanation. As long as normal events are flowing through, this operator does nothing. However, the moment it encounters an error notification from upstream, it immediately discards it and replaces it with a fixed value — Income.no(), in this example. onErrorReturn() is a fluent and very pleasant to read alternative to a try-catch block that returns fixed result in the catch statement known from imperative style:

1
2
3
4
5
try {
return determineIncome(Person person)
} catch(Exception e) {
return Income.no();
}

In this example, you might have noticed that this catch swallows the original exception and just returns a fixed value. This can be by design but it is generally a good idea to at least log an exception when it occurs. All error handling operators in RxJava behave this way — if you declaratively handle some exception, it will be swallowed. This is something you should definitely take into account; there is nothing worse than a malfunctioning system with a log file that does not reveal any issues.

onErrorReturn() passes error as an argument, which we happily ignore. You can either log an exception within onErrorReturn() or use the more specialized diagnostics operators, which are covered in “Monitoring and Debugging”. For the time being, just remember that all error handling operators in RxJava leave exception logging and monitoring up to you.

LAZILY COMPUTING FALLBACK VALUE USING ONERRORRESUMENEXT()

Returning a fixed stub result with onErrorReturn() might sometimes be a good approach, but more often than not you would actually like to lazily compute some fallback value in case of error. There are two possible scenarios here:

  • The primary way of generating a stream of data failed (onError() event, so we switch to a secondary source that is just as good, but for some reason we treat it as backup (slower, more expensive, and so on)
  • In the presence of a failure, we would like to replace real data with some less expensive, more stable, maybe stale information. For example, when retrieval of fresh data fails we choose possibly a stale stream from cache. Another common example is delivering a slightly worse user experience; for example, returning a list of global best-sellers rather than personalized recommendations in an online shop

Clearly, the logic required when an error occurs can be expensive on its own and can lead to errors. Therefore, we must somehow encapsulate the fallback logic in a lazy, preferably asynchronous wrapper. What can it be? Of course: an Observable!

1
2
3
4
5
6
7
8
9
10
Observable<Person> person = //...
Observable<Income> income = person
.flatMap(this::determineIncome)
.onErrorResumeNext(person.flatMap(this::guessIncome));
//...
private Observable<Income> guessIncome(Person person) {
//...
}

The onErrorResumeNext() operator basically replaces error notification with another stream. If you subscribe to an an Observable guarded with onErrorResumeNext() in case of failure, RxJava transparently switches from main Observable to the fallback one, specified as an argument.

Interestingly, you can replace onErrorResumeNext() with the concatWith() operator, assuming determineIncome always emits exactly one value or error:

1
2
3
4
5
6
7
8
Observable<Income> income = person
.flatMap(this::determineIncome)
.flatMap(
Observable::just,
th -> Observable.empty(),
Observable::empty)
.concatWith(person.flatMap(this::guessIncome))
.first();

There is something unfamiliar with the flatMap() operator here: it accepts three lambda expressions instead of one:

  • The first argument allows replacing each element from the upstream Observable with new Observable — this is exactly how flatMap() was used so far throughout this book
  • The second argument replaces the optional error notification with another stream. We want to ignore upstream errors so we simply switch to an empty Observable
  • Finally, when upstream completes normally, we can replace the completion notification with another stream

The usage of the first() operator is crucial here. By applying the first() operator, we wait only for the very first event to appear. In case of success, we get back a result of determineIncome and RxJava never really subscribes to guessIncome()’s result. But, in case of failure, the first Observable essentially yields no events, so the first() operator asks for another item, this time by subscribing to the fallback stream passed as an argument to concatWith().

I hope you realized by now that concatWith() is not needed at all in this example; flatMap() in its most complex form is enough. Even the first() operator is no longer needed. Think about it:

1
2
3
4
5
6
Observable<Income> income = person
.flatMap(this::determineIncome)
.flatMap(
Observable::just,
th -> person.flatMap(this::guessIncome),
Observable::empty);

The preceding example has an interesting feature: we can return a different Observable from onError() mapping based on th of type Throwable. So, theoretically we can return a different fallback stream based on the exception message or type. The onErrorResumeNext() operator has an overloaded version that allows just that:

1
2
3
4
5
6
7
8
9
Observable<Income> income = person
.flatMap(this::determineIncome)
.onErrorResumeNext(th -> {
if (th instanceof NullPointerException) {
return Observable.error(th);
} else {
return person.flatMap(this::guessIncome);
}
});

Although flatMap() is versatile enough to provide flexible error handling, onErrorResumeNext() is more expressive and easier to read, so you should prefer it.

Timing Out When Events Do Not Occur

RxJava provides some operators to handle exception notifications from an upstream Observable. But do you know what is even worse than an error? Silence. When a system you connect to fails with an exception, this is relatively simple to predict, handle, unit test, and so on. But what if you subscribe to an Observable and it simply never emits anything, even though you expected to get a result almost immediately? This scenario is much worse than simply having an error. The latency of the system is greatly affected, and it appears as if it was hanging with no clear indication in the logs whatsoever.

Luckily, RxJava provides a built-in timeout() operator that listens to the upstream Observable, constantly monitoring how much time elapsed since the last event or subscription. If it so happens that the silence between consecutive events is longer than a given period, the timeout() operator publishes an error notification that contains TimeoutException.

To better understand how timeout() works, first let’s consider an Observable that emits only one event after a certain time. For the purposes of this demonstration, we will create an Observable that returns some Confirmation event after 200 milliseconds. We simulate the latency by adding delay(100, MILLISECONDS). Moreover, we would like to simulate additional latency between the event and completion notification. That is the purpose of empty() Observable that normally just completes immediately but with the extra delay() it waits before sending a completion. Combining these two streams looks as follows:

1
2
3
4
5
6
7
8
9
10
Observable<Confirmation> confirmation() {
Observable<Confirmation> delayBeforeCompletion =
Observable
.<Confirmation>empty()
.delay(200, MILLISECONDS);
return Observable
.just(new Confirmation())
.delay(100, MILLISECONDS)
.concatWith(delayBeforeCompletion);
}

Now, let’s test drive the timeout() operator in its simplest overloaded version:

1
2
3
4
5
6
7
8
9
10
11
12
confirmation()
.timeout(210, MILLISECONDS)
.forEach(
System.out::println,
th -> {
if ((th instanceof TimeoutException)) {
System.out.println("Too long");
} else {
th.printStackTrace();
}
}
);

The 210-millisecond timeout is not a coincidence. The delay between subscription and arrival of Confirmation instance is exactly 100 milliseconds, so less than the timeout threshold. Also, the delay between this event and completion notification is 200 milliseconds, also less than 210. Therefore, in this example, the timeout() operator is transparent and does not influence the overall flow of messages. But decrease the timeout() threshold to slightly less than 200 milliseconds (say, 190) and it becomes visible.

The Confirmation is displayed but rather than a completion callback we receive an error notification holding TimeoutException. The first event arrived considerably less than 200 milliseconds but the latency between the first event and the second one (completion notification actually) exceeded 190 milliseconds and instead an error notification was propagated downstream. Of course, if the timeout threshold is less than 100 milliseconds, you will not even see the first event.

This was the simplest use case for timeout(); you’ll find it useful when you want to limit the time you wish to wait for a response or responses. However, sometimes a fixed timeout threshold is too strict and you would like to adjust timeouts at runtime.

Suppose that we built an algorithm for predicting the next solar eclipse. The interface of that algorithm is an Observable<LocalDate> (of course!) which streams future dates of these kinds of events. Imagine for a second that this algorithm is really computationally intensive, which again we are going to simulate, this time by using the interval() operator by zipping a fixed list of dates with a slowly progressing stream generated by interval(). The first date available appears after 500 milliseconds, and every subsequent one after 50 milliseconds, thanks to interval(500, 50, MILLISECONDS).

This is quite common in real-life systems: the initial element of the response has relatively high latency as a result of establishing the connection, SSL handshake, query optimization, or whatever the server is doing. But subsequent responses are either readily available or easily retrievable, so latency between them is much lower:

In these types of scenarios, having one fixed threshold is problematic. The first event should have a pessimistic limit, whereas subsequent limits should be much more aggressive. The overloaded version of timeout() does just that: it accept two factories of Observables, one marking the timeout of the first event, and the second one for each subsequent element. An example is worth a thousand words:

1
2
3
4
nextSolarEclipse(LocalDate.of(2016, SEPTEMBER, 1))
.timeout(
() -> Observable.timer(1000, TimeUnit.MILLISECONDS),
date -> Observable.timer(100, MILLISECONDS))

Here, the first Observable emits exactly one event after one second—this is the acceptable latency threshold for the first event. The second Observable is created for each event that appears on the stream and allows fine tuning of the timeout for the subsequent event. Notice that we do not use the date parameter. You can imagine a timeout value that is adaptive in some sense; for example, we can wait a little bit more for the next event if the previous one was bigger than usual. Or, vice versa, each subsequent event has a lower timeout, adapting to our subscriber’s performance.

It is sometimes useful to also track the latency of each event, even if we do not timeout. The handy timeInterval() operator does just that: it replaces each event of type T with TimeInterval<T> that encapsulates the event but also shows how much time has elapsed since the previous event (or subscription in case of first event):

1
2
3
Observable<TimeInterval<LocalDate>> intervals =
nextSolarEclipse(LocalDate.of(2016, JANUARY, 1))
.timeInterval();

Apart from getValue() that returns LocalDate, TimeInterval<LocalDate> also has getIntervalInMilliseconds() but it is easier to see how it looks studying the output of the preceding program after subscription. You can clearly see that it took 533 milliseconds for the first event to arrive but only around 50 milliseconds for each one subsequently:

1
2
3
4
5
6
7
8
9
10
TimeInterval [intervalInMilliseconds=533, value=2016-03-09]
TimeInterval [intervalInMilliseconds=49, value=2016-09-01]
TimeInterval [intervalInMilliseconds=50, value=2017-02-26]
TimeInterval [intervalInMilliseconds=50, value=2017-08-21]
TimeInterval [intervalInMilliseconds=50, value=2018-02-15]
TimeInterval [intervalInMilliseconds=50, value=2018-07-13]
TimeInterval [intervalInMilliseconds=50, value=2018-08-11]
TimeInterval [intervalInMilliseconds=50, value=2019-01-06]
TimeInterval [intervalInMilliseconds=51, value=2019-07-02]
TimeInterval [intervalInMilliseconds=49, value=2019-12-26]

The timeout() operator has yet another overloaded version that accepts the fallback Observable replacing the original source in case of error. It is very similar in behavior to onErrorResumeNext().

Retrying After Failures

The onError notification is terminal; no other event can ever appear in such stream. Therefore, if you want to signal business conditions that are potentially nonfatal, avoid onError. This is not much different from a common recommendation to avoid controlling the program flow by using exceptions. Instead, in Observables consider wrapping errors in special types of events that can emerge multiple times next to ordinary events.

For example, if you are providing a stream of transaction results and some transactions can fail due to business reasons such as insufficient funds, do not use onError notification for that. Instead, consider creating a TransactionResult abstract class with two concrete subclasses, each representing either success or failure. onError notification in such a stream signals that something is going terribly wrong, like a catastrophic failure preventing further emission of any event.

That being said, onError can represent transient failures of external components or systems. Surprisingly, often simply retrying one more time can lead to success. Other systems might be experiencing a brief load spike, GC pause, or restart. Retrying is an essential mechanism in building robust and resilient applications. RxJava has first-class support for retry.

The simplest version of the retry() operator resubscribes to a failed OBservable hoping that it will keep producing normal events rather than failures. For educational purposes, we will create an Observable that misbehaves severely:

1
2
3
4
5
6
7
8
9
10
Observable<String> risky() {
return Observable.fromCallable(() -> {
if (Math.random() < 0.1) {
Thread.sleep((long) (Math.random() * 2000));
return "OK";
} else {
throw new RuntimeException("Transient");
}
});
}

In 90 percent of the cases, subscribing to risky() ends with a RuntimeException. If you somehow make it to the “OK” branch an artificial delay between zero and two seconds is injected. Such a risky operation will serve as a demonstration of retry():

1
2
3
4
5
risky()
.timeout(1, SECONDS)
.doOnError(th -> log.warn("Will retry", th))
.retry()
.subscribe(log::info);

Remember that a slow system is generally indistinguishable from a broken one, but often it is even worse because we experience additional latency. Having timeouts, sometimes even aggressive ones with a retry mechanism is desirable—of course, as long as retrying has no side effects or the operation is idempotent.

The behavior of retry() is fairly straightforward: it pushes all events and completion notification downstream, but not onError(). The error notification is swallowed (so no exception is logged whatsoever), thus we use doOnError() callback. Every time retry() encounters a simulated RuntimeException or TimeoutException, it tries subscribing again.

A word of caution here: if your Observable is cached or otherwise guaranteed to always return the same sequence of elements, retry() will not work:

1
risky().cached().retry() //BROKEN

If risky() emits errors once, it will continue emitting them forever, no matter how many times you resubscribe. To overcome this issue, you can delay the creation of Observable even further by using defer():

1
2
3
Observable
.defer(() -> risky())
.retry()

Even if an Observable returned from risky() is cached, defer() calls risky() multiple times, possibly getting a new Observable each time.

RETRYING BY USING DELAY AND LIMITED ATTEMPTS

A plain retry() method is useful, but blindly resubscribing with no throttling or limiting attempts is dangerous. We can quickly saturate the CPU or network, generating a lot of load. Basically, parameterless retry() is a while loop with a try block within it, followed by an empty catch. First, we should limit the number of attempts, which happens to be built in:

1
2
3
risky()
.timeout(1, SECONDS)
.retry(10)

The integer parameter to retry() instructs how many times to resubscribe, thus retry(0) is equivalent to no retry at all. If the upstream Observable failed for the tenth time, the last seen exception is propagated downstream. A more flexible version of retry() leaves you with a decision about retry, based on the attempt number and the actual exception:

1
2
3
4
risky()
.timeout(1, SECONDS)
.retry((attempt, e) ->
attempt <= 10 && !(e instanceof TimeoutException))

This version not only limits the number of resubscription attempts to 10, but also drops retrying prematurely if the exception happens to be TimeoutException.

If failures are transient, waiting a little bit prior to a resubscription attempt sounds like a good idea. The retry() operator does not provide such a possibility out of the box, but it is relatively easy to implement. A more robust version of retry() called retryWhen() takes a function receiving an Observable of failures. Every time an upstream fails, this Observable emits a Throwable. Our responsibility is to transform this Observable in such a way that it emits some arbitrary event when we want to retry (hence the name):

1
2
3
risky()
.timeout(1, SECONDS)
.retryWhen(failures -> failures.delay(1, SECONDS))

The preceding example of retryWhen() receives an Observable that emits a Throwable every time the upstream fails. We simply delay that event by one second so that it appears in the resulting stream one second later. This is a signal to retryWhen() that it should attempt retry. If we simply returned the same stream (retryWhen(x -> x)), retryWhen() would behave exactly like retry(), resubscribing immediately when an error occurs. With retryWhen(), we can also easily simulate retry(10) (well, almost… keep reading):

1
.retryWhen(failures -> failures.take(10))

We receive an event each time a failure occurs. The stream we return is supposed to emit an arbitrary event when we want to retry. Thus, we simply forward the first 10 failures, causing each one of them to be retried immediately. But what happens when eleventh failure occurs in a failures Observable? This is where it becomes tricky.

The take(10) operator emits an onComplete event immediately following the 10th failure. Therefore, after the 10th retry, retryWhen() receives a completion event. This completion event is interpreted as a signal to stop retrying and complete downstream. It means that after 10 failed attempts, we simply emit nothing and complete. However, if we complete Observable returned inside retryWhen() with an error, this error will be propagated downstream.

In other words, as long as we emit any event from an Observable inside retryWhen(), they are interpreted as retry requests. However, if we send a completion or error notification, retry is abandoned and this completion or error is passed downstream. Doing just failures.take(10) will retry 10 times, but in case of yet another failure, we do not propagate the last error but the successful completion, instead. Let’s have a look at it:

1
2
3
4
5
6
7
8
9
10
11
static final int ATTEMPTS = 11;
//...
.retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
attempt < ATTEMPTS ?
Observable.timer(1, SECONDS) :
Observable.error(err))
.flatMap(x -> x)
)

This looks quite complex, but it is also really powerful. We zip failures with sequence numbers from 1 to 11. We would like to perform as many as 10 retry attempts, so if the attempt sequence number is smaller than 11, we return timer(1, SECONDS). The retryWhen() operator will capture this event and retry one second after failure. However, when the 10th retry ends with a failure, we return an Observable with that error, completing the retry mechanism with the last seen exception.

This gives us a lot of flexibility. We can stop retrying when a certain exception appears or when too many attempts were already performed. Moreover, we can adjust the delay time between attempts! For example, the first retry can appear immediately but the delays between subsequent retries should grow exponentially:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
.retryWhen(failures -> failures
.zipWith(Observable.range(1, ATTEMPTS),
this::handleRetryAttempt)
.flatMap(x -> x)
)
//...
Observable<Long> handleRetryAttempt(Throwable err, int attempt) {
switch (attempt) {
case 1:
return Observable.just(42L);
case ATTEMPTS:
return Observable.error(err);
default:
long expDelay = (long) Math.pow(2, attempt - 2);
return Observable.timer(expDelay, SECONDS);
}
}

On the first retry attempt, we return an Observable emitting an arbitrary event immediately, so that retry happens right away. It makes no difference what type and value of event we return (only the moment counts), so 42 is as good as any other value. On the last retry attempt, we forward an exception to the downstream Subscriber containing the last seen failure reason. Finally, for attempts 2 through 10, we calculate the delay using the following exponential formula.

Testing and Debugging

Stream composition, especially involving time, can become difficult. Happily, RxJava has great support for unit testing. You can use a TestSubscriber to assert emitted events, but more importantly, RxJava has a concept of virtual time. In essence, we have full control over the elapsing of time so that tests relying on time are both fast and predictable.

Virtual Time

Time is an important factor in almost any application we deal with, and we are not talking about latency and response times here. Everything happens at some point in time, the order of events is important, jobs are scheduled in the future. Therefore, we spend countless hours looking for bugs occurring only at certain dates or timezones.

There does not seem to be any established way of testing time-related code. One of the practices, known as property-based testing, aims at generating hundreds of test cases (sometimes randomized) to test a wide spectrum of input arguments. For example, let’s validate a very simple property: for any given date, adding and subsequently subtracting one month gives back the same date:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class PlusMinusMonthSpec extends Specification {
static final LocalDate START_DATE =
LocalDate.of(2016, Month.JANUARY, 1)
@Unroll
def '#date +/- 1 month gives back the same date'() {
expect:
date == date.plusMonths(1).minusMonths(1)
where:
date << (0..365).collect {
day -> START_DATE.plusDays(day)
}
}
}

We used the Spock framework in Groovy language to rapidly generate 366 different test cases. The code in the expect block is executed for each value generated in the where block. In the where block, we iterate over integers from 0 to 365 and generate all possible dates beginning on 2016-01-01 to 2016-12-31. The assertion is fairly obvious and straightforward: if we add and then subtract one month for pretty much any date we should get that date back. Yet 6 out of 366 test cases fail:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
date == date.plusMonths(1).minusMonths(1)
| | | | |
| | | 2016-02-29 2016-01-29
| | 2016-01-30
| false
2016-01-30
date == date.plusMonths(1).minusMonths(1)
| | | | |
| | | 2016-02-29 2016-01-29
| | 2016-01-31
| false
2016-01-31
date == date.plusMonths(1).minusMonths(1)
| | | | |
| | | 2016-04-30 2016-03-30
| | 2016-03-31
| false
2016-03-31
...

The reason to show this contrived example is to make you realize how complex the time domain is. But the peculiarities of the calendar are not the root cause of the headaches we have when dealing with time in computer systems. RxJava tries to tackle the complexity of concurrency by avoiding state and using pure functions as often as possible.

Being pure means that a function (or operator) should explicitly declare all inputs and output. This makes testing much easier. However, the dependency on time is almost always hidden and concealed. Every time you see new Date(), Instant.now(), System.currentTimeMillis(), and many others, you are depending on an external value that changes… well, over time. We know depending on singletons is bad for your design, especially from a testability point of view. But, reading current time is effectively relying on a system-wide singleton available everywhere.

One of the patterns to make dependency on time more explicit involves a fake system clock. This pattern requires all programmers to be very rigorous and delegate time-related code to a special service that can be mocked. Java 8 formalizes this method by introducing the Clock abstraction, which boils down to the following:

1
2
3
4
5
6
7
8
9
10
11
public abstract class Clock {
public static Clock system(ZoneId zone) { /* ... */ }
public long millis() {
return instant().toEpochMilli();
}
public abstract Instant instant();
}

Interestingly, RxJava has a very similar abstraction that we already explored in great detail: Schedulers. How are Schedulers related to the passage of time, you might ask? Well, everything that happens in RxJava either happens immediately or is scheduled in some time in the future. It is the Scheduler that has full control over when to execute every single line of code in RxJava.

Schedulers in Unit Testing

Various Schedulers like io() or computation() have no special capabilities apart from running tasks at given points in time. However, there is one special test() Scheduler that has two intriguing methods: advanceTimeBy() and advanceTimeTo(). These methods of TestScheduler are capable of advancing the time manually; otherwise, it’s frozen forever. This means that no tasks scheduled in the future on this Scheduler are ever executed until we manually advance time whenever we find it useful.

As an example, let’s look at a sequence of events appearing over time:

1
2
3
4
5
6
7
8
9
10
11
12
TestScheduler sched = Schedulers.test();
Observable<String> fast = Observable
.interval(10, MILLISECONDS, sched)
.map(x -> "F" + x)
.take(3);
Observable<String> slow = Observable
.interval(50, MILLISECONDS, sched)
.map(x -> "S" + x);
Observable<String> stream = Observable.concat(fast, slow);
stream.subscribe(System.out::println);
System.out.println("Subscribed");

When subscribed, we should see three events F0, F1, and F2, each preceded with 10 ms delay, followed by an infinite number of S0, S1… events, each after 50 ms delay. How can we test that we combined all these streams together, that events appear in the correct order and, more importantly, at the correct time? The key is the explicit TestScheduler that we passed wherever it was possible:

1
2
3
4
5
6
7
8
9
10
11
TimeUnit.SECONDS.sleep(1);
System.out.println("After one second");
sched.advanceTimeBy(25, MILLISECONDS);
TimeUnit.SECONDS.sleep(1);
System.out.println("After one more second");
sched.advanceTimeBy(75, MILLISECONDS);
TimeUnit.SECONDS.sleep(1);
System.out.println("...and one more");
sched.advanceTimeTo(200, MILLISECONDS);

The output you can expect is absolutely predictable and repeatable, entirely independent from system time, and experiences transient load spikes, GC pauses, and so on:

1
2
3
4
5
6
7
8
9
10
Subscribed
After one second
F0
F1
After one more second
F2
S0
...and one more
S1
S2

Here is what happens:

  1. After we subscribed to stream Observable, it began by scheduling F0 task 10 ms in the future. However, it used TestScheduler that sits absolutely idle unless we manually advance time
  2. Sleeping one second is actually irrelevant and could be omitted, TestScheduler is independent from system time, thus no events are emitted at all. Sleeping here is only to prove that TestScheduler works. If this were not a TestScheduler but an ordinary (default) one, you could expect several events to appear on the console by now
  3. Calling advanceTimeBy(25ms) forces everything that was scheduled up to 25th millisecond to be triggered and executed. This causes events F0 (10th ms) and F1 (20th ms) to appear on the console
  4. Sleeping another second does nothing to the output; TestScheduler ignores real time. However, calling advanceTimeBy(75ms) (so the logical time is now 100th ms) further triggers F2 (30th ms) and S0 (80th ms). Nothing more happens
  5. After one more second of real time elapsed, we advance time to absolute value of 200 ms (advanceTimeTo(200ms), advanceTimeBy() uses relative time). TestScheduler realizes that S1 (130th ms) and S2 (180th ms) should have been triggered by that time. But no other event is triggered, even if we wait for eternity

As you can see, TestScheduler is actually much more clever than an ordinary fake Clock abstraction. Not only do we have full control over current time, but we can also arbitrarily postpone all events. One caveat is that you must pass TestScheduler everywhere, basically to every operator that has an optional Scheduler argument. For your convenience, all such operators use a default computation() Scheduler, but from a testability point of view, you should prefer passing an explicit Scheduler. Moreover, consider dependency injection and provide Schedulers from the outside.

But having TestScheduler alone is not enough. It works very well in unit tests for which predictability is a must and flickering tests failing sporadically are quite frustrating. Chapter 8 explores tools and techniques that enable unit testing of inherently asynchronous Observables.

Unit Testing

Writing testable code and having a solid suite of tests has been a necessity for a long time, not a novel approach. Whether you write tests first in test-driven development (TDD) spirit or hack around and confirm your assumptions with few integration tests later, automated testing is something you should be comfortable with. Therefore, the tools you use (frameworks, libraries, platforms) must support automated tests, and this ability should be one of the aspect when making technology decisions.

Fear no more, RxJava has excellent support for unit testing, despite a quite complex domain of asynchronous, event-driven architecture. Explicitness of time, combined with focus on pure functions and function composition (well grounded in functional programming) greatly improve the testing experience.

VERIFYING EMITTED EVENTS

First, we need to define our goals for testing Observables. Having a method returning an Observable we probably want to make sure of the following:

  • Events are emitted in correct order
  • Errors are properly propagated
  • Various operators are composed together as predicted
  • Events appear at right moments in time
  • Backpressure is supported

And much more. The first two requirements are simple and do not require any special support from RxJava. Basically, collect everything that was emitted and execute assertions using whichever library you prefer:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void shouldApplyConcatMapInOrder() throws Exception {
List<String> list = Observable
.range(1, 3)
.concatMap(x -> Observable.just(x, -x))
.map(Object::toString)
.toList()
.toBlocking()
.single();
assertThat(list).containsExactly("1", "-1", "2", "-2", "3", "-3");
}

The preceding simple test case transforms an Observable into List<Integer> by using the well-known toList()toBlocking()single() construct. Normally, an Observable is asynchronous, so to have predictable and fast tests, we must perform such transformation. We can also easily assert onError() notifications when BlockingObservable is used. Exceptions are simply rethrown upon subscription. Notice that checked exceptions are wrapped with RuntimeExceptions — something only a good test can prove:

1
2
3
4
5
6
7
8
9
10
11
12
File file = new File("404.txt");
BlockingObservable<String> fileContents = Observable
.fromCallable(() -> Files.toString(file, UTF_8))
.toBlocking();
try {
fileContents.single();
failBecauseExceptionWasNotThrown(FileNotFoundException.class);
} catch (RuntimeException expected) {
assertThat(expected)
.hasCauseInstanceOf(FileNotFoundException.class);
}

The fromCallable() operator is handy when you want to lazily create an Observable that emits at most one element. It also handles error handling and backpressure, so you should prefer it over Observable.create() for one-element streams. You can use another type of unit test to prove our understanding of various operators and their behavior. For example, what does the concatMapDelayError() operator actually do? You are free to try it once, but having an automated test that everyone can read and quickly grasp is a great advantage:

1
2
3
4
5
6
7
8
9
10
11
12
Observable<Notification<Integer>> notifications = Observable
.just(3, 0, 2, 0, 1, 0)
.concatMapDelayError(x -> fromCallable(() -> 100 / x))
.materialize();
List<Notification.Kind> kinds = notifications
.map(Notification::getKind)
.toList()
.toBlocking()
.single();
assertThat(kinds).containsExactly(OnNext, OnNext, OnNext, OnError);

With the standard concatMap(), the transformation of the second element (0) would fail and terminate the entire stream. However, we clearly see that our final stream has four elements: three OnNext notifications followed by OnError. Another assertion could actually show that indeed the final values are 33 (100 / 3), 50, and 100. This nicely explains how concatMapDelayError() works — if any error is generated from transformation, it is not passed downstream but the operator continues. Only when the upstream source completes, we instead pass onError notification that we found along the way.

In this last test case, we could no longer convert Observable to List because it would throw an exception immediately. materialize() is useful in such cases: each kind of event (onNext, onCompleted, and onError) is wrapped in a homogeneous Notification object. These objects can later be examined, but this is tedious and not very readable. This is where TestSubscriber becomes handy:

1
2
3
4
5
6
7
8
9
Observable<Integer> obs = Observable
.just(3, 0, 2, 0, 1, 0)
.concatMapDelayError(x -> Observable.fromCallable(() -> 100 / x));
TestSubscriber<Integer> ts = new TestSubscriber<>();
obs.subscribe(ts);
ts.assertValues(33, 50, 100);
ts.assertError(ArithmeticException.class); //Fails (!)

The TestSubscriber class is quite simple: it stores all events and notifications it received internally so that we can later query it. TestSubscriber also provides a set of assertions that are very useful in a test case. All we need to do is create an instance of TestSubscriber, subscribe to an Observable-under-test, and examine the contents of it. Strangely, the preceding test actually fails. assertError() fails because we expect the stream to complete with ArithmeticException, whereas in reality we got CompositeException that aggregates all three ArithmeticExceptions found along the way. This is yet another reason why discovering operators by running them and testing automatically is quite useful.

TestSubscriber is extremely effective when working hand in hand with TestScheduler. A typical scenario involves interleaving assertions and advancing time to observe how events are flowing over time. Imagine that you have a service that returns an Observable. The details of its implementation are entirely irrelevant:

1
2
3
interface MyService {
Observable<LocalDate> externalCall();
}

Rather than mixing different concerns, we decided to build a decorator over MyService that adds timeout functionality to whatever the underlying implementation of MyService is. For the reasons that you can probably guess by now, we also go the extra mile of externalizing the Scheduler used by the timeout() operator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MyServiceWithTimeout implements MyService {
private final MyService delegate;
private final Scheduler scheduler;
MyServiceWithTimeout(MyService d, Scheduler s) {
this.delegate = d;
this.scheduler = s;
}
@Override
public Observable<LocalDate> externalCall() {
return delegate
.externalCall()
.timeout(1, TimeUnit.SECONDS,
Observable.empty(),
scheduler);
}
}

The RxJava approach involves a synthetic, controlled clock that is entirely predictable. 100 percent accurate but also extremely fast tests are achieved by artificially advancing time. First, we setup a mock of MyService (using Mockito) that can return any Observable:

1
2
3
4
5
6
7
8
9
10
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
private MyServiceWithTimeout mockReturning(
Observable<LocalDate> result,
TestScheduler testScheduler) {
MyService mock = mock(MyService.class);
given(mock.externalCall()).willReturn(result);
return new MyServiceWithTimeout(mock, testScheduler);
}

We will now write two unit tests. The first ensures that in case of an externalCall() that never finishes, we receive a timeout precisely after one second:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void timeoutWhenServiceNeverCompletes() throws Exception {
//given
TestScheduler testScheduler = Schedulers.test();
MyService mock = mockReturning(
Observable.never(), testScheduler);
TestSubscriber<LocalDate> ts = new TestSubscriber<>();
//when
mock.externalCall().subscribe(ts);
//then
testScheduler.advanceTimeBy(950, MILLISECONDS);
ts.assertNoTerminalEvent();
testScheduler.advanceTimeBy(100, MILLISECONDS);
ts.assertCompleted();
ts.assertNoValues();
}

The never() operator returns an Observable that never completes and never emits any value. This simulates MyService’s call that is painfully slow. Then, we make a sequence of two assertions. First, we advance time just before the timeout threshold (950 milliseconds) and make sure that the TestSubscriber did not yet complete or fail. After 100 more milliseconds — that is, after the timeout threshold — we assert that the stream completed (assertCompleted()) with no values (assertNoValues()). We can also take advantage of assertError().

The second test should ensure that the timeout does not kick in before the configured threshold:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void valueIsReturnedJustBeforeTimeout() throws Exception {
//given
TestScheduler testScheduler = Schedulers.test();
Observable<LocalDate> slow = Observable
.timer(950, MILLISECONDS, testScheduler)
.map(x -> LocalDate.now());
MyService myService = mockReturning(slow, testScheduler);
TestSubscriber<LocalDate> ts = new TestSubscriber<>();
//when
myService.externalCall().subscribe(ts);
//then
testScheduler.advanceTimeBy(930, MILLISECONDS);
ts.assertNotCompleted();
ts.assertNoValues();
testScheduler.advanceTimeBy(50, MILLISECONDS);
ts.assertCompleted();
ts.assertValueCount(1);
}

advanceTimeBy() is equivalent to sleeping in test, waiting for some action to happen, but without actually sleeping. You can test all sorts of operators like buffer(), sample(), and so on, as long as you meticulously allow passing a custom Scheduler. Speaking of schedulers, it is tempting to use Schedulers.immediate() as opposed to standard ones. This Scheduler avoids concurrency by invoking all actions in the context of the caller thread. Such an approach works in some scenarios, but in general you should prefer TestScheduler because its use cases are far more versatile.

Following the dependency injection principle is very important. Otherwise, you will not be able to replace various Schedulers with test one. There are some techniques that can help you; for example, the RxJavaSchedulersHook plug-in. RxJava has a set of plug-ins that can globally alter the behavior of the library. RxJavaSchedulersHook, for example, can override the standard computation() Scheduler (and other) with test one:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final TestScheduler testScheduler = new TestScheduler();
@Before
public void alwaysUseTestScheduler() {
RxJavaPlugins
.getInstance()
.registerSchedulersHook(new RxJavaSchedulersHook() {
@Override
public Scheduler getComputationScheduler() {
return testScheduler;
}
@Override
public Scheduler getIOScheduler() {
return testScheduler;
}
@Override
public Scheduler getNewThreadScheduler() {
return testScheduler;
}
});
}

This global approach has many shortcomings. You can register only RxJavaSchedulersHook once across all of JVM, so invoking this @Before method for the second time fails. You can work around this, but it becomes increasingly complex. Also running unit tests in parallel (normally, unit tests are independent from one another so it should not be an issue) becomes impossible. Therefore, the only scalable solution for controlling time is explicitly passing TestScheduler whenever possible.

The one last thing that you can exercise with TestSubscriber is backpressure. In “Honoring the Requested Amount of Data”, we examined two implementations of an infinite Observable that produces subsequent natural numbers. One was using an old-fashioned raw Observable.create(), which does not support backpressure:

1
2
3
4
5
6
7
8
Observable<Long> naturals1() {
return Observable.create(subscriber -> {
long i = 0;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
}
});
}

The more advanced but recommended implementation fully supports backpressure:

1
2
3
4
5
6
7
8
9
10
Observable<Long> naturals2() {
return Observable.create(
SyncOnSubscribe.createStateful(
() -> 0L,
(cur, observer) -> {
observer.onNext(cur);
return cur + 1;
}
));
}

From a functionality point of view, these two are the same, both are infinite but you can, for example, just take only a selected subset. However, with TestSubscriber we can easily unit-test whether a given Observable also supports backpressure:

1
2
3
4
5
6
7
8
9
10
TestSubscriber<Long> ts = new TestSubscriber<>(0);
naturals1()
.take(10)
.subscribe(ts);
ts.assertNoValues();
ts.requestMore(100);
ts.assertValueCount(10);
ts.assertCompleted();

The crucial part of this example is the TestSubscriber<>(0) constructor. Without it, TestSubscriber simply receives everything at the velocity dictated by the source. But, if we request no data prior to subscription, TestSubscriber does not request any data from an Observable. This is the reason why we see assertNoValues() despite the source Observable clearly emitting 10 values. Later, we request as much as 100 items (just to be safe) but obviously the source Observable emits only 10 — as many as it can possibly produce. This test fails for naturals1 almost immediately, and the following message appears:

1
AssertionError: No onNext events expected yet some received: 10

Our naive Observable, of course, knows to stop emitting events after receiving 10, despite being infinite. The take(10) operator eagerly unsubscribes ending the internal while loop. However, naturals1 ignores the backpressure requests issued by TestSubscriber, the latter receives items it never wanted. If you replace source with naturals2, now the test passes. This is another reason to avoid plain Observable.create() in favor of the built-in factories and SyncOnSubscribe.

TestSubscriber has many other assertions. Some of them block waiting for completion; for example, awaitTerminalEvent(). Most of them, however, assert the state of the subscriber at the current moment, so that we can observe events flowing over time.

Monitoring and Debugging

Monitoring the behavior of various streams interacting with one another and troubleshooting when issues arise is a difficult subject in RxJava. As a matter of fact, every asynchronous event-driven system is inherently more difficult to troubleshoot compared to blocking architectures. When a synchronous operation fails, the exception flows all the way up the call stack, exposing the exact sequence of operations that caused it, from HTTP server, through all filters, aspects, business logic, and so on.

In an asynchronous system, the call stack is of limited use because when an event crosses the thread boundary, we no longer have the original call stack available. The same applies to distributed systems. This section gives you few tips on how to make monitoring and debugging easier in applications using RxJava.

doOn…() Callbacks

Every Observable has a set of callback methods that you can use to peek into various events, namely:

  • doOnCompleted()
  • doOnEach()
  • doOnError()
  • doOnNext()
  • doOnRequest()
  • doOnSubscribe()
  • doOnTerminate()
  • doOnUnsubscribe()

What they all have in common is that they are not allowed to alter the state of an Observable in any way and they all return the same Observable, which makes them an ideal place to sprinkle some logging logic. For example, many newcomers forget that the code within Observable.create() is executed for each new Subscriber. This is important especially when a subscription triggers side effects like a network call. To detect such problems, it is a good practice to log every subscription to critical sources:

1
2
3
4
5
6
7
8
Observable<Instant> timestamps = Observable
.fromCallable(() -> dbQuery())
.doOnSubscribe(() -> log.info("subscribe()"));
timestamps
.zipWith(timestamps.skip(1), Duration::between)
.map(Object::toString)
.subscribe(log::info);

The preceding program queries the database (dbQuery()) and retrieves some time series data in the form of Observable<Instant>. We would like to transform this stream a little bit by calculating the duration (using Duration class from the java.time package) between each subsequent pairs of Instants: first and second, second and third, and so on. One way to do this is to zip() stream with itself shifted by one element. This way we tie together the first with the second element, the second with the third, up to the end.

What we did not anticipate is that zipWith() actually subscribes to all of the underlying streams, effectively subscribing to the same timestamps Observable twice. This is a problem that you can discover by observing doOnSubscribe() is being invoked twice. This leads to duplicated database query.

Speaking of zip(), thanks to backpressure it no longer buffers faster stream infinitely, waiting for a slower one to emit events. Instead, it asks for a fixed batch of values from each Observable, throwing MissingBackpressureException if it received more:

1
2
3
.doOnSubscribe(() -> log.info("subscribe()"))
.doOnRequest(c -> log.info("Requested {}", c))
.doOnNext(instant -> log.info("Got: {}", instant));

doOnRequest() logs Requested 128, the value chosen by zip operator. Even when the source is infinite or very large, we should see at most 128 messages such as Got: ... afterward from a well-behaving Observable. doOnNext() is another callback that we can take advantage of. Another useful operator that you should use fairly often is doOnError(). It invokes callback every time an error notification flows from upstream. You cannot use doOnError() for any error handling; it is for logging only. It does not consume the error notification, which keeps propagating downstream:

1
2
3
4
Observable<String> obs = Observable
.<String>error(new RuntimeException("Swallowed"))
.doOnError(th -> log.warn("onError", th))
.onErrorReturn(th -> "Fallback");

As clean as onErrorReturn() looks, it is very easy to swallow exceptions with it. It does provide the exception that we want to replace with a fallback value, but logging it is our responsibility. To keep functions small and composable, logging the error first in doOnError() and then handling the exception in the following line silently is a little bit more robust. Forgetting to log the exception is rarely a good idea and must be a careful decision, not an oversight.

Other operators are rather self-explanatory, with the possible exception of this pair:

doOnEach()

This is invoked for each Notification, namely onNext(), onCompleted(), and onError(). It can accept either a lambda invoked for each Notofication or an Observer.

doOnTerminate()

This is invoked when either onCompleted() or onError() occurs. It is impossible to distinguish between them, so it might be better to use doOnCompleted() and doOnError() independently.

Measuring and Monitoring

Callbacks are not only useful for logging. Having various telemetric probes built into your application (like simple counters, timers, distribution histograms, and so on) and available externally can greatly reduce troubleshooting time as well as give great insight into what an application is doing. There are many libraries that simplify collecting and publishing metrics, one of them being Dropwizard metrics. Before you begin using this library, you need to do a little bit of setup:

1
2
3
4
5
6
7
8
9
10
11
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import org.slf4j.LoggerFactory;
MetricRegistry metricRegistry = new MetricRegistry();
Slf4jReporter reporter = Slf4jReporter
.forRegistry(metricRegistry)
.outputTo(LoggerFactory.getLogger(SomeClass.class))
.build();
reporter.start(1, TimeUnit.SECONDS);

MetricRegistry is a factory for various metrics. Additionally, we set up a Slf4jReporter that will push a current snapshot of statistics to a given SLF4J logger. Other reporters publishing to Graphite and Ganglia are available. Having this basic setup you can being monitoring your streams.

One of the simplest metrics you can think of is a simple Counter that can be incremented or decremented. You can use it to measure the number of events that flew through the stream:

1
2
3
4
final Counter items = metricRegistry.counter("items");
observable
.doOnNext(x -> items.inc())
.subscribe(...);

After you subscribe to this Observable, Counter will being showing how many items were generated so far. This information becomes even more useful when you publish it to an external monitoring server like Graphite and put it on a chart over time.

Another important metric that you might want to capture is how many items are being concurrently processed right at that moment. For example, flatMap() can easily spin hundreds and more concurrent Observables and subscribe to all of them. Knowing how many such Observables we have (think about open database connections, web sockets, and so on) can give significant insight into the system:

1
2
3
4
5
6
7
8
9
10
11
Observable<Long> makeNetworkCall(long x) {
//...
}
Counter counter = metricRegistry.counter("counter");
observable
.doOnNext(x -> counter.inc())
.flatMap(this::makeNetworkCall)
.doOnNext(x -> counter.dec())
.subscribe(...);

When an event appears from upstream, we increment the counter. When an event appears after flatMap() (which means one of the asynchronous operations just emitted something), we decrement it. In an idle system, the counter is always zero, but when an upstream observable produces a lot of events and makeNetworkCall() is relatively slow, this counter skyrockets, clearly indicating where the bottleneck is.

The preceding example assumes that makeNetworkCall() always returns just one item and never fails (never completes with onError()). If instead you want to measure the time between subscription to the internal Observable (when the work actually began) and its completion, it is equally straightforward:

1
2
3
4
5
6
7
observable
.flatMap(x ->
makeNetworkCall(x)
.doOnSubscribe(counter::inc)
.doOnTerminate(counter::dec)
)
.subscribe(...);

One of the most complex metrics is Timer, which measures the duration between two points in time. I cannot overstate the value of such a metric — we can measure network call latency, database query time, user response time, and much more. The way we typically measure time is by taking a snapshot of the current time, doing some lengthy operation, and then noting the difference between the time now and then. This is encapsulated in the Metrics library like this:

1
2
3
4
5
6
import com.codahale.metrics.Timer;
Timer timer = metricRegistry.timer("timer");
Timer.Context ctx = timer.time();
//some lengthy operation...
ctx.stop();

The API keeps the operation start time encapsulated in Timer.Context and assumes that the code we are benchmarking is blocking. But what if we want to measure the time between subscription to an Observable for which we have no control and its termination? doOnSubscribe() and doOnTerminate() are insufficient here because we cannot pass Timer.Context between them. Luckily, RxJava is flexible enough to tackle this problem anyway by one extra layer of composition:

1
2
3
4
5
6
7
8
Observable<Long> external = //...
Timer timer = metricRegistry.timer("timer");
Observable<Long> externalWithTimer = Observable
.defer(() -> Observable.just(timer.time()))
.flatMap(timerCtx ->
external.doOnCompleted(timerCtx::stop));

We use a little trick. First, we lazily start time with a help of the defer() operator. This way, the timer starts exactly when subscription happens. Later, we (in a way) replace the Timer.Context instance with the actual Observable that we want to benchmark (external). However, before we return external Observable, we stop our running timer. You can use this technique to measure the time between subscription and termination of any Observable over which you have no control.

Summary

Every reactive library or framework, due to their asynchronous and event-driven nature, is challenging when it comes to debugging and troubleshooting. RxJava is no exception here, but it provides a handful of tools that make developers’ and operations’ life easier.

  • First, RxJava embraces errors and make it easy to handle and manage
  • Secondly, it provides facilities for monitoring and debugging streams in real-time
  • Finally, it has excellent unit-testing support

As a matter of fact, being able to take full control over the system clock is immensely useful for time-sensitive operators. RxJava can be difficult to troubleshoot at first. Yet it provides a clear API and strict contract as opposed to superficially simpler blocking code that has hidden race conditions and poor throughput.