Reactive Programming with RxJava: Reactive Programming with RxJava

Foreword

After many false starts it finally dawned on us that by dualizing the Iterable/Iterator interface for synchronous collections, we could obtain a pair of interfaces to represent asynchronous event streams, with all the familiar sequence operators such as map, filter, scan, zip, groupBy, etc. for transforming and combining asynchronous data streams, and thus Rx was born somewhere in the summer of 2007.

Fast forward to 2016 and the popularity and use of Rx has skyrocketed. All traffic through the Netflix API relies upon RxJava, as does the Hystrix fault-tolerance library that bulkheads all internal service traffic, and via related reactive libraries RxNetty and Mantis, Netflix is now creating a completely reactive network stack for connecting all internal services across machine and process boundaries.

RxJava is also extremely successful in the Android space with companies like SoundCloud, Square, NYT, Seatgeek all using RxJava for their Android apps and contributing to the RxAndroid extension library. noSQL vendors such as Couchbase and Splunk also offer Rx-based bindings to their data access layer.

The original implementation of Rx in .NET focussed squarely on transforming asynchronous event streams, and used asynchronous enumerable for scenarios that needed back pressure. Since Java does not have language support for async await, the community extended the Observer and Observable types with the concept of reactive pull and introduced the Producer interface.

Reactive Programming with RxJava

RxJava is a specific implementation of reactive programming for Java and Android that is influenced by functional programming. It favors function composition, avoidance of global state and side effects, and thinking in streams to compose asynchronous and event-based programs. It begins with the observer pattern of producer/consumer callbacks and extends it with dozens of operators that allow composing, transforming, scheduling, throttling, error handling, and lifecycle management.

Reactive Programming and RxJava

Reactive programming is a general programming term that is focused on reacting to changes, such as data values or events. It can and often is done imperatively. A callback is an approach to reactive programming done imperatively. A spreadsheet is a great example of reactive programming: cells dependent on other cells automatically “react” when those other cells change.

Reactive-functional programming therefore is an approach to programming—an abstraction on top of imperative systems—that allows us to program asynchronous and event-driven use cases without having to think like the computer itself and imperatively define the complex interactions of state, particularly across thread and network boundaries.

So, the short answer to what reactive-functional programming is solving is concurrency and parallelism. More colloquially, it is solving callback hell, which results from addressing reactive and asynchronous use cases in an imperative way.

When you Need Reactive Programming

Reactive programming is useful in scenarios such as the following:

  • Processing user events such as mouse movement and clicks, keyboard typing, GPS signals changing over time as users move with their device, device gyroscope signals, touch events, and so on
  • Responding to and processing any and all latency-bound IO events from disk or network, given that IO is inherently asynchronous
  • Handling events or data pushed at an application by a producer it cannot control

    If the code in question is handling only one event stream, reactive-imperative programming with a callback is going to be fine, and bringing in reactive-functional programming is not going to give you much benefit. If your program is like most though, you need to combine events (or asynchronous responses from functions or network calls), have conditional logic interacting between them, and must handle failure scenarios and resource cleanup on any and all of them. This is where the reactive-imperative approach begins to dramatically increase in complexity and reactive-functional programming begins to shine.

Hence this is where the tagline for Reactive Extensions (Rx) in general and RxJava specifically comes from, “a library for composing asynchronous and event-based programs.” RxJava is a concrete implementation of reactive programming principles influenced by functional and data-flow programming.

How RxJava Works

Push versus Pull

The entire point of RxJava being reactive is to support push, so the Observable and related Observer type signatures support events being pushed at it. This in turn generally is accompanied by asynchrony, which is discussed in the next section. But the Observable type also supports an asynchronous feedback channel (also sometimes referred to as async-pull or reactive-pull), as an approach to flow control or backpressure in async systems.

1
2
3
interface Observable<T> {
Subscription subscribe(Observer s);
}

Upon subscription, the Observer can have three types of events pushed to it:

1
2
3
4
5
interface Observer<T> {
void onNext(T t);
void onError(Throwable t);
void onCompleted();
}

The onNext() method might never be called or might be called once, many, or infinite times. The onError() and onCompleted() are terminal events, meaning that only one of them can be called and only once. When a terminal event is called, the Observable stream is finished and no further events can be sent over it. Terminal events might never occur if the stream is infinite and does not fail.

There is an additional type of signature to permit interactive pull:

1
2
3
interface Producer {
void request(long n);
}

This is used with a more advanced Observer called Subscriber:

1
2
3
4
5
6
7
8
interface Subscriber<T> implements Observer<T>, Subscription {
void onNext(T t);
void onError(Throwable t);
void onCompleted();
//...
void unsubscribe();
void setProducer(Producer p);
}

Async versus Sync

Generally, an Observable is going to be asynchronous, but it doesn’t need to be. An Observable can be synchronous, and in fact defaults to being synchronous. RxJava never adds concurrency unless it is asked to do so.

A synchronous Observable would be subscribed to, emit all data using the subscriber’s thread, and complete (if finite). An Observable backed by blocking network I/O would synchronously block the subscribing thread and then emit via onNext() when the blocking network I/O returned.

For example, the following is completely synchronous:

1
2
3
4
Observable.create(s -> {
s.onNext("Hello World!");
s.onCompleted();
}).subscribe(hello -> System.out.println(hello));

The RxJava Observable is purposefully agnostic with regard to async versus sync, and whether concurrency exists or where it comes from. This is by design and allows the implementation of the Observable to decide what is best.

In-meory data

If data exists in a local in-memory cache (with constant microsecond/nanosecond lookup times), it does not make sense to pay the scheduling cost to make it asynchronous. The Observable can just fetch the data synchronously and emit it on the subscribing thread, as shown here:

1
2
3
4
Observable.create(s -> {
s.onNext(cache.get(SOME_KEY));
s.onCompleted();
}).subscribe(value -> System.out.println(value));

This scheduling choice is powerful when the data might or might not be in memory. If it is in memory, emit it synchronously; if it’s not, perform the network call asynchronously and return the data when it arrives. This choice can reside conditionally within the Observable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create(s -> {
T fromCache = getFromCache(SOME_KEY);
if(fromCache != null) {
// emit synchronously
s.onNext(fromCache);
s.onCompleted();
} else {
// fetch asynchronously
getDataAsynchronously(SOME_KEY)
.onResponse(v -> {
putInCache(SOME_KEY, v);
s.onNext(v);
s.onCompleted();
})
.onFailure(exception -> {
s.onError(exception);
});
}
}).subscribe(s -> System.out.println(s));

Synchronous computation (such as operators)

The more common reason for remaining synchronous is stream composition and transformation via operators. RxJava mostly uses the large API of operators used to manipulate, combine, and transform data, such as map(), filter(), take(), flatMap(), and groupBy(). Most of these operators are synchronous, meaning that they perform their computation synchronously inside the onNext() as the events pass by.

1
2
3
4
5
6
7
8
Observable<Integer> o = Observable.create(s -> {
s.onNext(1);
s.onNext(2);
s.onNext(3);
s.onCompleted();
});
o.map(i -> "Number " + i).subscribe(s -> System.out.println(s));

The important thing to understand here is that most Observable function pipelines are synchronous (unless a specific operator needs to be async, such as timeout or observeOn), whereas the Observable itself can be async.

1
2
3
4
5
6
7
8
Observable.create(s -> {
// ... async subscription and data emission ...
})
.doOnNext(i -> System.out.println(Thread.currentThread()))
.filter(i -> i % 2 == 0)
.map(i -> "Value " + i + " processed on " + Thread.currentThread())
.subscribe(s -> System.out.println("SOME VALUE =>" + s));;
System.out.println("Will print BEFORE values are emitted");

In this example, the Observable is async (it emits on a thread different from that of the subscriber), so subscribe is nonblocking, and the println at the end will output before events are propagated and “SOME VALUE ⇒” output is shown.

Concurrency and Parallelism

Individual Observable streams permit neither concurrency nor parallelism. Instead, they are achieved via composition of async Observables.

The contract of an RxJava Observable is that events (onNext(), onCompleted(), onError()) can never be emitted concurrently. In other words, a single Observable stream must always be serialized and thread-safe. Each event can be emitted from a different thread, as long as the emissions are not concurrent. This means no interleaving or simultaneous execution of onNext(). If onNext() is still being executed on one thread, another thread cannot begin invoking it again (interleaving).

Here’s an example of what’s okay:

1
2
3
4
5
6
7
8
9
Observable.create(s -> {
new Thread(() -> {
s.onNext("one");
s.onNext("two");
s.onNext("three");
s.onNext("four");
s.onCompleted();
}).start();
});

Here’s an example of code that is illegal:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// DO NOT DO THIS
Observable.create(s -> {
// Thread A
new Thread(() -> {
s.onNext("one");
s.onNext("two");
}).start();
// Thread B
new Thread(() -> {
s.onNext("three");
s.onNext("four");
}).start();
// ignoring need to emit s.onCompleted() due to race of threads
});
// DO NOT DO THIS

A single Observable stream is always serialized, but each Observable stream can operate independently of one another, and thus concurrently and/or in parallel.

Here is a contrived example showing the mechanics of two asynchronous Observables running on separate threads and merged together:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable<String> a = Observable.create(s -> {
new Thread(() -> {
s.onNext("one");
s.onNext("two");
s.onCompleted();
}).start();
});
Observable<String> b = Observable.create(s -> {
new Thread(() -> {
s.onNext("three");
s.onNext("four");
s.onCompleted();
}).start();
});
// this subscribes to a and b concurrently,
// and merges into a third sequential stream
Observable<String> c = Observable.merge(a, b);

Observable c will receive items from both a and b, and due to their asynchrony, three things occur:

  • “one” will appear before “two”
  • “three” will appear before “four”
  • The order between one/two and three/four is unspecified

So why not just allow onNext() to be invoked concurrently?

Primarily because onNext() is meant for us humans to use, and concurrency is difficult. If onNext() could be invoked concurrently, it would mean that every Observer would need to code defensively for concurrent invocation, even when not expected or wanted.

A second reason is because some operations just aren’t possible with concurrent emission; for example, scan and reduce, which are common and important behaviors. Operators such as scan and reduce require sequential event propagation so that state can be accumulated on streams of events that are not both associative and commutative. Allowing concurrent Observable streams (with concurrent onNext()) would limit the types of events that can be processed and require thread-safe data structures.

A third reason is that performance is affected by synchronization overhead because all observers and operators would need to be thread-safe, even if most of the time data arrives sequentially. Despite the JVM often being good at eliminating synchronization overhead, it is not always possible (particularly with nonblocking algorithms using atomics) so this ends up being a performance tax not needed on sequential streams.

Additionally, it is often slower to do generic fine-grained parallelism. Parallelism typically needs to be done coarsely, such as in batches of work, to make up for the overhead of switching threads, scheduling work, and recombining.

Lazy versus Eager

The Observable type is lazy, meaning it does nothing until it is subscribed to. This differs from an eager type such as a Future, which when created represents active work. Lazyiness allows composing Observables together without data loss due to race conditions without caching.

In practice, this means two things:

  • Subscription, not construction starts work

Due to the laziness of an Observable, creating one does not actually cause any work to happen (ignoring the “work” of allocating the Observable object itself). All it does is define what work should be done when it is eventually subscribed to. Consider an Observable defined like this:

1
2
3
4
5
6
Observable<T> someData = Observable.create(s -> {
getDataFromServerWithCallback(args, data -> {
s.onNext(data);
s.onCompleted();
});
})

The someData reference now exists, but getDataFromServerWithCallback is not yet being executed. All that has happened is that the Observable wrapper has been declared around a unit of work to be performed, the function that lives inside the Observable.

Subscribing to the Observable causes the work to be done:

1
someData.subscribe(s -> System.out.println(s));

This lazily executes the work represented by the Observable.

  • Observables can be reused

Because the Observable is lazy, it also means a particular instance can be invoked more than once. Continuing with the previous example this means we can do the following:

1
2
someData.subscribe(s -> System.out.println("Subscriber 1: " + s));
someData.subscribe(s -> System.out.println("Subscriber 2: " + s));

Now there will be two separate subscriptions, each calling getDataFromServerWithCallback and emitting events.

This laziness is powerful when doing composition. For example:

1
2
3
someData
.onErrorResumeNext(lazyFallback)
.subscribe(s -> System.out.println(s));

In this case, lazyFallback Observable represents work that can be done, but will only be done if something subscribes to it, and that we only want subscribed to if someData fails. Of course, eager types can be made lazy by using function calls (such as getDataAsFutureA()).

Duality

An Rx Observable is the async “dual” of an Iterable. By “dual,” we mean the Observable provides all the functionality of an Iterable except in the reverse flow of data: it is push instead of pull.

1
2
3
4
5
6
7
Pull(Iterable)
T next()
throws Exception
returns
1
2
3
4
5
6
7
Push(Observable)
onNext(T)
onError(Throwable)
onCompleted()

The fact that it behaves as a dual effectively means anything you can do synchronously via pull with an Iterable and Iterator can be done asynchronously via push with an Observable and Observer. This means that the same programming model can be applied to both!

For example, as of Java 8 an Iterable can be upgraded to have function composition via the java.util.stream.Stream type to work like this:

1
2
3
4
5
6
7
// Iterable<String> as Stream<String>
// that contains 75 strings
getDataFromLocalMemorySynchronously()
.skip(10)
.limit(5)
.map(s -> s + "_transformed")
.forEach(System.out::println);

An RxJava Observable is used the same way:

1
2
3
4
5
6
7
// Observable<String>
// that emits 75 strings
getDataFromNetworkAsynchronously()
.skip(10)
.take(5)
.map(s -> s + "_transformed")
.subscribe(System.out::println)

In other words, the Rx Observable allows programming with async data via push just like Streams around Iterables and Lists using synchronous pull.

Cardinality

The Observable supports asynchronously pushing multiple values. This nicely fits into the lower right of the following table, the async dual of Iterable (or Stream, List, Enumerable, etc.) and multivalued version of a Future.

1
2
3
4
5
One Many
Sync T getData() Iterable<T> getData()
Async Future<T> getData() Observable<T> getData()

So, why might an Observable be valuable instead of just Future? The most obvious reason is that you are dealing with either an event stream or a multivalued response. The less obvious reason is composition of multiple single-valued responses.

Event stream

Event stream is straightforward. Over time the producer pushes events at the consumer, as demonstrated here:

1
2
3
4
5
// producer
Observable<Event> mouseEvents = ...;
// consumer
mouseEvents.subscribe(e -> doSomethingWithEvent(e));

This doesn’t work very well with a Future:

1
2
3
4
5
// producer
Future<Event> mouseEvents = ...;
// consumer
mouseEvents.onSuccess(e -> doSomethingWithEvent(e));

The onSuccess callback could have received the “last event,” but some questions remain: Does the consumer now need to poll? Will the producer enqueue them? Or will they be lost in between each fetch? The Observable is definitely beneficial here. In the absence of Observable, a callback approach would be better than modeling this with a Future.

Multiple values

Multivalued responses are the next use of Observable. Basically, anywhere that a List, Iterable, or Stream would be used, Observable can be used instead:

1
2
3
4
5
// producer
Observable<Friend> friends = ...
// consumer
friends.subscribe(friend -> sayHello(friend));

Now, this can work with a Future, like this:

1
2
3
4
5
6
7
// producer
Future<List<Friend>> friends = ...
// consumer
friends.onSuccess(listOfFriends -> {
listOfFriends.forEach(friend -> sayHello(friend));
});

So why use the Observable<Friend> approach?

The most compelling reason is that items can be processed as received rather than waiting for the entire collection to arrive. This is particularly true when different network latencies on the backend can affect each item differently, which is actually fairly common due to long-tail latencies (such as in service-oriented or microservice architectures) and shared data stores.

Composition

A multivalued Observable type is also useful when composing single-valued responses, such as from Futures.

When merging together multiple Futures, they emit another Future with a single value, such as this:

1
2
3
4
5
6
CompletableFuture<String> f1 = getDataAsFuture(1);
CompletableFuture<String> f2 = getDataAsFuture(2);
CompletableFuture<String> f3 = f1.thenCombine(f2, (x, y) -> {
return x+y;
});

That might be exactly what is wanted, and is actually available in RxJava via Observable.zip:

1
2
3
4
5
6
Observable<String> o1 = getDataAsObservable(1);
Observable<String> o2 = getDataAsObservable(2);
Observable<String> o3 = Observable.zip(o1, o2, (x, y) -> {
return x+y;
});

However, it means waiting until all Futures are completed before emitting anything. Oftentimes, it is preferable to emit each returned Future value as it completes. In this case, Observable.merge (or the related flatMap) is preferable. It allows composing the results (even if each is just an Observable emitting one value) into a stream of values that are each emitted as soon as they are ready:

1
2
3
4
5
Observable<String> o1 = getDataAsObservable(1);
Observable<String> o2 = getDataAsObservable(2);
// o3 is now a stream of o1 and o2 that emits each item without waiting
Observable<String> o3 = Observable.merge(o1, o2);

Single

Now, despite Rx Observable being great at handling multivalued streams, the simplicity of a single-valued representation is very nice for API design and consumption. Additionally, basic request/response behavior is extremely common in applications. For this reason, RxJava provides a Single type, which is a lazy equivalent to a Future.

For example, consider these accessors:

1
2
3
4
5
6
7
8
9
10
public static Single<String> getDataA() {
return Single.<String> create(o -> {
o.onSuccess("DataA");
}).subscribeOn(Schedulers.io());
}
public static Single<String> getDataB() {
return Single.just("DataB")
.subscribeOn(Schedulers.io());
}

These can then be used and optionally composed like this:

1
2
// merge a & b into an Observable stream of 2 values
Observable<String> a_merge_b = getDataA().mergeWith(getDataB());

Note how two Singles are merged into an Observable. This could result in an emission of [A, B] or [B, A], depending on which completes first.

Using Single instead of Observable to represent a “stream of one” simplifies consumption because a developer must consider only the following behaviors for the Single type:

  • It can respond with an error
  • Never respond
  • Respond with a success

Compare this with the additional states a consumer must consider with an Observable:

  • It can respond with an error
  • Never respond
  • Respond successfully with no data and terminate
  • Respond successfully with a single value and terminate
  • Respond successfully with multiple values and terminate
  • Respond successfully with one or more values and never terminate (waiting for more data)

Completable

In addition to Single, RxJava also has a Completable type that addresses the surprisingly common use case of having no return type, just the need to represent successful or failed completion. Often Observable<Void> or Single<Void> ends up being used. This is awkward, so Completable came to be, as demonstrated here:

1
Completable c = writeToDatabase("data");

The Completable itself is an abstraction for two callbacks, completion and failure, like this:

1
2
3
4
5
6
7
8
9
static Completable writeToDatabase(Object data) {
return Completable.create(s -> {
doAsyncWrite(data,
// callback for successful completion
() -> s.onCompleted(),
// callback for failure with Throwable
error -> s.onError(error));
});
}

Zero to infinity

Observable can support cardinalities from zero to infinity (which is explored more in “Infinite Streams”). But for simplicity and clarity, Single is an “Observable of One,” and Completable is an “Observable of None.”

1
2
3
4
5
Zero One Many
Synchronous void doSomething() T getData() Iterable<T> getData()
Asynchronous Completable doSomething() Single<T> getData() Observable<T> getData()

Mechanical Sympathy: Blocking versus Nonblocking I/O

Thus far, the argument for the reactive-functional style of programming has primarily been about providing an abstraction over async callbacks to allow more manageable composition. And, it is fairly obvious that performing unrelated network requests concurrently rather than sequentially is beneficial to experienced latency, thus the reason for adopting asynchrony and needing composition.

But is there an efficiency reason for adopting the reactive approach (either imperative or functional) in how we perform I/O? Are there benefits to using nonblocking I/O, or is blocking I/O threads to wait on a single network request okay? Performance testing I was involved in at Netflix demonstrated that there are objective and measurable efficiency benefits to adopting nonblocking I/O and event loops over thread-per-request blocking I/O.

Reactive abstraction

Ultimately RxJava types and operators are just an abstraction over imperative callbacks. However, this abstraction completely changes the coding style and provides very powerful tools for doing async and nonblocking programming. It takes effort to learn and requires a shift of thinking to be comfortable with function composition and thinking in streams, but when you’ve achieved this it is a very effective tool alongside our typical object-oriented and imperative programming styles.

(To Be Continued)