After many false starts it finally dawned on us that by dualizing the Iterable/Iterator interface for synchronous collections, we could obtain a pair of interfaces to represent asynchronous event streams, with all the familiar sequence operators such as map, filter, scan, zip, groupBy, etc. for transforming and combining asynchronous data streams, and thus Rx was born somewhere in the summer of 2007.
Fast forward to 2016 and the popularity and use of Rx has skyrocketed. All traffic through the Netflix API relies upon RxJava, as does the Hystrix fault-tolerance library that bulkheads all internal service traffic, and via related reactive libraries RxNetty and Mantis, Netflix is now creating a completely reactive network stack for connecting all internal services across machine and process boundaries.
RxJava is also extremely successful in the Android space with companies like SoundCloud, Square, NYT, Seatgeek all using RxJava for their Android apps and contributing to the RxAndroid extension library. noSQL vendors such as Couchbase and Splunk also offer Rx-based bindings to their data access layer.
The original implementation of Rx in .NET focussed squarely on transforming asynchronous event streams, and used asynchronous enumerable for scenarios that needed back pressure. Since Java does not have language support for async await, the community extended the Observer and Observable types with the concept of reactive pull and introduced the Producer interface.
RxJava is a specific implementation of reactive programming for Java and Android that is influenced by functional programming. It favors function composition, avoidance of global state and side effects, and thinking in streams to compose asynchronous and event-based programs. It begins with the observer pattern of producer/consumer callbacks and extends it with dozens of operators that allow composing, transforming, scheduling, throttling, error handling, and lifecycle management.
Reactive programming is a general programming term that is focused on reacting to changes, such as data values or events. It can and often is done imperatively. A callback is an approach to reactive programming done imperatively. A spreadsheet is a great example of reactive programming: cells dependent on other cells automatically “react” when those other cells change.
Reactive-functional programming therefore is an approach to programming—an abstraction on top of imperative systems—that allows us to program asynchronous and event-driven use cases without having to think like the computer itself and imperatively define the complex interactions of state, particularly across thread and network boundaries.
So, the short answer to what reactive-functional programming is solving is concurrency and parallelism. More colloquially, it is solving callback hell, which results from addressing reactive and asynchronous use cases in an imperative way.
Reactive programming is useful in scenarios such as the following:
- Processing user events such as mouse movement and clicks, keyboard typing, GPS signals changing over time as users move with their device, device gyroscope signals, touch events, and so on
- Responding to and processing any and all latency-bound IO events from disk or network, given that IO is inherently asynchronous
Handling events or data pushed at an application by a producer it cannot control
If the code in question is handling only one event stream, reactive-imperative programming with a callback is going to be fine, and bringing in reactive-functional programming is not going to give you much benefit. If your program is like most though, you need to combine events (or asynchronous responses from functions or network calls), have conditional logic interacting between them, and must handle failure scenarios and resource cleanup on any and all of them. This is where the reactive-imperative approach begins to dramatically increase in complexity and reactive-functional programming begins to shine.
Hence this is where the tagline for Reactive Extensions (Rx) in general and RxJava specifically comes from, “a library for composing asynchronous and event-based programs.” RxJava is a concrete implementation of reactive programming principles influenced by functional and data-flow programming.
The entire point of RxJava being reactive is to support push, so the
Observable and related
Observer type signatures support events being pushed at it. This in turn generally is accompanied by asynchrony, which is discussed in the next section. But the
Observable type also supports an asynchronous feedback channel (also sometimes referred to as async-pull or reactive-pull), as an approach to flow control or backpressure in async systems.
Upon subscription, the Observer can have three types of events pushed to it:
onNext() method might never be called or might be called once, many, or infinite times. The
onCompleted() are terminal events, meaning that only one of them can be called and only once. When a terminal event is called, the Observable stream is finished and no further events can be sent over it. Terminal events might never occur if the stream is infinite and does not fail.
There is an additional type of signature to permit interactive pull:
This is used with a more advanced Observer called
Observable is going to be asynchronous, but it doesn’t need to be. An
Observable can be synchronous, and in fact defaults to being synchronous. RxJava never adds concurrency unless it is asked to do so.
Observable would be subscribed to, emit all data using the subscriber’s thread, and complete (if finite). An
Observable backed by blocking network I/O would synchronously block the subscribing thread and then emit via
onNext() when the blocking network I/O returned.
For example, the following is completely synchronous:
Observable is purposefully agnostic with regard to async versus sync, and whether concurrency exists or where it comes from. This is by design and allows the implementation of the
Observable to decide what is best.
If data exists in a local in-memory cache (with constant microsecond/nanosecond lookup times), it does not make sense to pay the scheduling cost to make it asynchronous. The
Observable can just fetch the data synchronously and emit it on the subscribing thread, as shown here:
This scheduling choice is powerful when the data might or might not be in memory. If it is in memory, emit it synchronously; if it’s not, perform the network call asynchronously and return the data when it arrives. This choice can reside conditionally within the
The more common reason for remaining synchronous is stream composition and transformation via operators. RxJava mostly uses the large API of operators used to manipulate, combine, and transform data, such as
groupBy(). Most of these operators are synchronous, meaning that they perform their computation synchronously inside the
onNext() as the events pass by.
The important thing to understand here is that most
Observable function pipelines are synchronous (unless a specific operator needs to be async, such as
observeOn), whereas the
Observable itself can be async.
In this example, the
Observable is async (it emits on a thread different from that of the subscriber), so
subscribe is nonblocking, and the println at the end will output before events are propagated and “SOME VALUE ⇒” output is shown.
Observable streams permit neither concurrency nor parallelism. Instead, they are achieved via composition of async
The contract of an RxJava
Observable is that events (
onError()) can never be emitted concurrently. In other words, a single
Observable stream must always be serialized and thread-safe. Each event can be emitted from a different thread, as long as the emissions are not concurrent. This means no interleaving or simultaneous execution of
onNext() is still being executed on one thread, another thread cannot begin invoking it again (interleaving).
Here’s an example of what’s okay:
Here’s an example of code that is illegal:
Observable stream is always serialized, but each
Observable stream can operate independently of one another, and thus concurrently and/or in parallel.
Here is a contrived example showing the mechanics of two asynchronous
Observables running on separate threads and merged together:
Observable c will receive items from both
b, and due to their asynchrony, three things occur:
- “one” will appear before “two”
- “three” will appear before “four”
- The order between one/two and three/four is unspecified
So why not just allow
onNext() to be invoked concurrently?
onNext() is meant for us humans to use, and concurrency is difficult. If
onNext() could be invoked concurrently, it would mean that every
Observer would need to code defensively for concurrent invocation, even when not expected or wanted.
A second reason is because some operations just aren’t possible with concurrent emission; for example,
reduce, which are common and important behaviors. Operators such as
reduce require sequential event propagation so that state can be accumulated on streams of events that are not both associative and commutative. Allowing concurrent
Observable streams (with concurrent
onNext()) would limit the types of events that can be processed and require thread-safe data structures.
A third reason is that performance is affected by synchronization overhead because all observers and operators would need to be thread-safe, even if most of the time data arrives sequentially. Despite the JVM often being good at eliminating synchronization overhead, it is not always possible (particularly with nonblocking algorithms using atomics) so this ends up being a performance tax not needed on sequential streams.
Additionally, it is often slower to do generic fine-grained parallelism. Parallelism typically needs to be done coarsely, such as in batches of work, to make up for the overhead of switching threads, scheduling work, and recombining.
Observable type is lazy, meaning it does nothing until it is subscribed to. This differs from an eager type such as a
Future, which when created represents active work. Lazyiness allows composing
Observables together without data loss due to race conditions without caching.
In practice, this means two things:
- Subscription, not construction starts work
Due to the laziness of an
Observable, creating one does not actually cause any work to happen (ignoring the “work” of allocating the
Observable object itself). All it does is define what work should be done when it is eventually subscribed to. Consider an
Observable defined like this:
someData reference now exists, but
getDataFromServerWithCallback is not yet being executed. All that has happened is that the
Observable wrapper has been declared around a unit of work to be performed, the function that lives inside the
Subscribing to the
Observable causes the work to be done:
This lazily executes the work represented by the
- Observables can be reused
Observable is lazy, it also means a particular instance can be invoked more than once. Continuing with the previous example this means we can do the following:
Now there will be two separate subscriptions, each calling
getDataFromServerWithCallback and emitting events.
This laziness is powerful when doing composition. For example:
In this case,
Observable represents work that can be done, but will only be done if something subscribes to it, and that we only want subscribed to if
someData fails. Of course, eager types can be made lazy by using function calls (such as
Observable is the async “dual” of an
Iterable. By “dual,” we mean the
Observable provides all the functionality of an
Iterable except in the reverse flow of data: it is push instead of pull.
The fact that it behaves as a dual effectively means anything you can do synchronously via pull with an
Iterator can be done asynchronously via push with an
Observer. This means that the same programming model can be applied to both!
For example, as of Java 8 an
Iterable can be upgraded to have function composition via the
java.util.stream.Stream type to work like this:
Observable is used the same way:
In other words, the Rx
Observable allows programming with async data via push just like
Lists using synchronous pull.
Observable supports asynchronously pushing multiple values. This nicely fits into the lower right of the following table, the async dual of
Enumerable, etc.) and multivalued version of a
So, why might an
Observable be valuable instead of just
Future? The most obvious reason is that you are dealing with either an event stream or a multivalued response. The less obvious reason is composition of multiple single-valued responses.
Event stream is straightforward. Over time the producer pushes events at the consumer, as demonstrated here:
This doesn’t work very well with a
onSuccess callback could have received the “last event,” but some questions remain: Does the consumer now need to poll? Will the producer enqueue them? Or will they be lost in between each fetch? The
Observable is definitely beneficial here. In the absence of
Observable, a callback approach would be better than modeling this with a
Multivalued responses are the next use of
Observable. Basically, anywhere that a
Stream would be used,
Observable can be used instead:
Now, this can work with a
Future, like this:
So why use the
The most compelling reason is that items can be processed as received rather than waiting for the entire collection to arrive. This is particularly true when different network latencies on the backend can affect each item differently, which is actually fairly common due to long-tail latencies (such as in service-oriented or microservice architectures) and shared data stores.
Observable type is also useful when composing single-valued responses, such as from
When merging together multiple
Futures, they emit another
Future with a single value, such as this:
That might be exactly what is wanted, and is actually available in RxJava via
However, it means waiting until all
Futures are completed before emitting anything. Oftentimes, it is preferable to emit each returned
Future value as it completes. In this case,
Observable.merge (or the related
flatMap) is preferable. It allows composing the results (even if each is just an
Observable emitting one value) into a stream of values that are each emitted as soon as they are ready:
Now, despite Rx
Observable being great at handling multivalued streams, the simplicity of a single-valued representation is very nice for API design and consumption. Additionally, basic request/response behavior is extremely common in applications. For this reason, RxJava provides a
Single type, which is a lazy equivalent to a
For example, consider these accessors:
These can then be used and optionally composed like this:
Note how two
Singles are merged into an
Observable. This could result in an emission of
[A, B] or
[B, A], depending on which completes first.
Single instead of
Observable to represent a “stream of one” simplifies consumption because a developer must consider only the following behaviors for the
- It can respond with an error
- Never respond
- Respond with a success
Compare this with the additional states a consumer must consider with an
- It can respond with an error
- Never respond
- Respond successfully with no data and terminate
- Respond successfully with a single value and terminate
- Respond successfully with multiple values and terminate
- Respond successfully with one or more values and never terminate (waiting for more data)
In addition to
Single, RxJava also has a
Completable type that addresses the surprisingly common use case of having no return type, just the need to represent successful or failed completion. Often
Single<Void> ends up being used. This is awkward, so
Completable came to be, as demonstrated here:
Completable itself is an abstraction for two callbacks, completion and failure, like this:
Observable can support cardinalities from zero to infinity (which is explored more in “Infinite Streams”). But for simplicity and clarity,
Single is an “Observable of One,” and
Completable is an “Observable of None.”
Thus far, the argument for the reactive-functional style of programming has primarily been about providing an abstraction over async callbacks to allow more manageable composition. And, it is fairly obvious that performing unrelated network requests concurrently rather than sequentially is beneficial to experienced latency, thus the reason for adopting asynchrony and needing composition.
But is there an efficiency reason for adopting the reactive approach (either imperative or functional) in how we perform I/O? Are there benefits to using nonblocking I/O, or is blocking I/O threads to wait on a single network request okay? Performance testing I was involved in at Netflix demonstrated that there are objective and measurable efficiency benefits to adopting nonblocking I/O and event loops over thread-per-request blocking I/O.
Ultimately RxJava types and operators are just an abstraction over imperative callbacks. However, this abstraction completely changes the coding style and provides very powerful tools for doing async and nonblocking programming. It takes effort to learn and requires a shift of thinking to be comfortable with function composition and thinking in streams, but when you’ve achieved this it is a very effective tool alongside our typical object-oriented and imperative programming styles.
(To Be Continued)