The aim of this chapter is to explain fundamentals of RxJava’s operators and how you can compose them to build high-level, easy-to-reason data pipelines. One of the reasons why RxJava is so powerful is the rich universe of built-in operators it provides and the possibility of creating custom ones. An operator is a function that takes upstream
Observable<T> and returns downstream
Observable<R>, where types
R might or might not be the same. Operators allow composing simple transformations into complex processing graphs.
But even when you recognize how wonderful Rx operators are, the true power comes from combining them together. Chaining several operators, forking stream into multiple substreams and then joining them back is idiomatic and you should feel fairly comfortable with it.
Operators are typically instance methods on
Observable that somehow alter the behavior of upstream
Observable as seen by downstream
Observables or Subscribers. This might sound complex, but it is actually quite flexible and not that difficult to grasp. One of the simplest examples of operators is
filter(), which takes a predicate and either passes events further or discards them:
It’s now time to introduce the so-called marble diagrams, the ubiquitous visualizations in RxJava documentation. A marble diagram illustrates how various operators work:
The diagram that follows is a concrete example of a marble diagram representing a
filter() operator. The
Observable.filter() returns the exact same events (so the marbles on top and on the bottom are the same), but some events are skipped because they did not satisfy the predicate:
When dealing with certain types of Observables, some events might be out of your interest, for example when consuming high volumes of data. It is also a common practice to
filter() the same
Observable multiple times, each time with a different predicate.
In RxJava, you must forget about mutating data structures internally: modifying variables outside of stream is considered very nonidiomatic and dangerous. Every single operator returns a new
Observable, leaving the original one untouched.
This makes reasoning about the flow of events much simpler. You can fork a stream into multiple independent sources, each having different characteristics. One of the powers of RxJava is that you can reuse a single
Observable in multiple places without affecting other consumers. If you pass an
Observable to some unknown function you can be sure that this
Observable will not become corrupted in any way by that function.
Imagine that you have a stream of some events and you must perform certain transformation on each event. This can be decoding from JSON to Java object (or vice versa), enriching, wrapping, extracting from the event, and so on. This is where the invaluable
map() operator is useful.
All of the ways in which you can define dates
Observable are equivalent, from the most verbose using
Func1<T, R> to the most compact Java 8 syntax method reference and type inference.
Observables are lazy, meaning that they do not begin producing events until someone subscribes. You can create infinite streams that take hours to compute the first value, but until you actually express your desire to be notified about these events,
Observable is just a passive and idle data structure for some type
This even applies to hot
Observables — even though the source of events keeps producing them, not a single operator like
filter() is evaluated until someone actually shows an interest. Otherwise, running all of these computational steps and throwing away the result would make no sense.
flatMap() is among the most important operators in RxJava. At first sight, it is similar to
map() but the transformation of each element can return another (nested, inner)
Observable can represent another asynchronous operation, we quickly discover that
flatMap() can be used to spawn asynchronous computation for each upstream event (fork execution) and join the results back. Conceptually
Observable<T> and a function from
flatMap() first constructs
Observable<Observable<R>> replacing all upstream values of type
Observable<R> (just like map()).
However, it does not stop there: it automatically subscribes to these inner
Observable<R> streams to produce a single stream of type
R, containing all values from all inner streams, as they come. The marble diagram that follows shows how this works:
Because RxJava concurrently subscribes to both of them and merges them together, events produced from one inner
Observable can interleave with events from another.
flatMap() is the most fundamental operator in RxJava, using it one can easily implement
As a rule of thumb, you use
flatMap() for the following situations:
- The result of transformation in
map()must be an
Observable. For example, performing long-running, asynchronous operation on each element of the stream without blocking
- You need a one-to-many transformation, a single event is expanded into multiple sub-events. For example, a stream of customers is translated into streams of their orders, for which each customer can have an arbitrary number of orders
Now imagine that you would like to use a method returning an
Set). For example, if
Customer has a simple
List<Order> getOrders(), you are forced to go through several operators to take advantage of it in
Or, equivalent and equally verbose:
The need to map from a single item to
Iterable is so popular that an operator,
flatMapIterable(), was created to perform just such a transformation:
You must take care when simply wrapping methods in an
getOrders() was not a simple getter but an expensive operation in terms of run time, it is better to implement
getOrders() to explicitly return
Another interesting variant of
flatMap() can react not only to events, but on any notification, namely events, errors, and completion. The simplified signature of this
flatMap() overload follows. For an
Observable<T> we must provide the following:
- A function mapping single
- A function mapping an error notification →
- A no-arg function reacting on upstream completion that can return
Imagine that you are creating a service that uploads videos. It takes a
UUID and returns upload progress with
Observable<Long> — how many bytes it transferred. We can take advantage of that progress anyway — for example, displaying it in the user interface. But what we are really interested in is completion, when the upload is finally done. Only after a successful upload can we begin rating the video. Naive implementation can simply subscribe to the progress stream, ignoring events and only reacting on completion (last callback):
However, notice that the
rate() method actually returns
Observable<Rating> that got lost. What we really want is for the
store() method to return that second
Observable<Rating>. But we can’t simply call
rate() concurrently, because the latter will fail if the former did not finish yet. The answer is
flatMap() again in the most complex form:
Take a moment to digest the preceding code snippet. We have an
Observable<Long> as returned by the
upload() method. For each progress update of type Long we return
Observable.empty(), effectively discarding these events.
Moreover, we are not interested in errors, but contrary to logging them, we pass them through to the subscriber. The rule of thumb is that if you don’t know how to handle an exception, let your supervisor (e.g., the calling method, parent task, or downstream
Observable) make a decision.
Finally, the last lambda expression (
() -> rate(id)) reacts upon stream completion. At this point, we replace completion notification with another
Observable<Rating>. So, even if the original
Observable wanted to terminate, we ignore that and in a way append a different
Keep in mind that all three callbacks must return
Observable<R> of the same type R.
In practice, we do not replace
flatMap() due to the clarity of code and performance. Just to make sure you understand the syntactic part of
flatMap(), another abstract example translates from a sequence of characters to Morse code:
As you can clearly see, every character is replaced by a sequence of
DAH sounds (dots and dashes). When character is unrecognizable, an empty sequence is returned.
flatMap() ensures that we get a steady, flat stream of sounds, as opposed to
Observable<Observable<Sound>>, which we would get with plain
delay() basically takes an upstream
Observable and shifts all events further in time.
We can replace
timer() and (surprise!)
flatMap() like this:
I hope this is clear: we generate an artificial event from
timer() that we completely ignore. However, using
flatMap() we replace that artificial event (zero, in i value) with three immediately emitted values:
z. This is somewhat equivalent to
just(x, y, z).delay(1, SECONDS) in this particular case; however, it is not so in general.
delay() is more comprehensive than
timer() because it shifts every single event further by a given amount of time, whereas
timer() simply “sleeps” and emits a special event after given time.
For completeness, let us mention about an overloaded variant of
delay() that can compute the amount of
delay on a per-event basis rather than globally for every event. The following code snippet delays the emission of every
String, depending on how long that
When running this program, even after subscribing, your application will terminate immediately without displaying any results because emission occurs in the background. In Chapter 4, you will learn about
BlockingObservable that makes such simple testing easier.
What you will notice then is that the first word to occur is
sit, followed by
elit one second later. Remember that
delay() can be rewritten to
flatMap() essentially does is take a master sequence (
Observable) of values appearing over time (events) and replaces each of the events with an independent subsequence. These subsequences are generally unrelated to one another and to the event that generated them from master sequence.
To make it clear, you no longer have a single the master sequence but a set of
Observables, each working on its own, coming and going over time. Therefore,
flatMap() cannot give any guarantee about what order of those subevents will arrive at the downstream operator/subscriber. Take this simple code snippet as an example:
In this example, we delay event
10L by 10 seconds and event
1L (chronologically appearing later in upstream) by 1 second. As a result, we see
1 after a second and
10 nine seconds later—the order of events in upstream and downstream is different!
What if you absolutely need to keep the order of downstream events so that they align perfectly with upstream events? In other words, downstream events resulted from upstream event
N must occur before events from
N + 1. It turns out there is a handy
concatMap() operator that has the exact same syntax as
flatMap() but works quite differently:
This time the output is exactly what we anticipated:
So what happened under the hood? When the first event (Sunday) appears from upstream,
concatMap() subscribes to an
Observable returned from
loadRecordsFor() and passes all events emitted from it downstream. When this inner stream completes,
concatMap() waits for the next upstream event (Monday) and continues.
concatMap() does not introduce any concurrency whatsoever but it preserves the order of upstream events, avoiding overlapping.
Suppose that you have a large list of users wrapped in an
Observable. Each User has a
loadProfile() method that returns an
Observable<Profile> instance fetched using an HTTP request. Our aim is to load the profiles of all users as fast as possible.
flatMap() was designed exactly for that: to allow spawning concurrent computation for each upstream value:
At first sight it looks great.
Observable<User> is constructed from a fixed List using the
from() operator; thus, when subscribed it emits all users pretty much instantaneously. For every new User
flatMap() transparently subscribes to every new
Observable<Profile>, redirecting all Profile events downstream. Subscription to inner
Observable<Profile> most likely makes a new HTTP connection. Therefore, if we have, say 10,000 Users, we suddenly triggered 10,000 concurrent HTTP connections. If all of them hit the same server, we can expect any of the following:
- Rejected connections
- Long wait time and timeouts
- Crashing the server
- Hitting rate-limit or blacklisting
- Overall latency increase
- Issues on the client, including too many open sockets, threads, excessive memory usage
Increasing concurrency pays off only up to certain point. If you try to run too many operations concurrently, you will most likely end up with a lot of context switches, high memory and CPU utilization, and overall performance degradation.
One solution could be to slow down
Observable<User> somehow so that it does not emit all Users at once. However, tuning that delay to achieve optimal concurrency level is troublesome. Instead
flatMap() has a very simple overloaded version that limits the total number of concurrent subscriptions to inner streams:
maxConcurrent parameter limits the number of ongoing inner
Observables. In practice when
flatMap() receives the first 10 Users it invokes
loadProfile() for each of them. However, when the 11th User appears from upstream,
flatMap() will not even call
loadProfile(). Instead, it will wait for any ongoing inner streams to complete. Therefore, the maxConcurrent parameter limits the number of background tasks that are forked from
You can probably see that
concatMap(f) is semantically equivalent to
flatMap(f, 1) —
maxConcurrent equal to one. We could spend a couple of extra pages discussing the nuances of
flatMap(), but more exciting operators lie ahead of us.
Transforming a single
Observable is interesting, but what if there are more
Observables that need to cooperate? If you come from traditional concurrent programing in Java, full of Threads and Executors, you know how difficult shared mutable state and synchronization is. Fortunately, RxJava works even better in such circumstances.
Also the library has a consistent way of handling errors in all operators involving multiple streams. If any of the upstream sources emits an error notification, it will be forwarded downstream and complete the downstream sequence with an error, as well.
If more than one upstream
Observable emits an error, the first one wins and the others are discarded (any
Observable can emit
onError only once). Finally, if you want to continue processing and emit errors only when all normal events were produced, many operators have a
Imagine that we have three algorithms that are already RxJava-enabled, each one nicely encapsulated within
Observable. Of course, each algorithm alone can produce zero to possibly an infinite number of results:
What we would like to do is run these three algorithms side by side and receive results as soon as possible. We do not care which algorithm emitted an event, we want to catch all of them and aggregate into a single stream. This is what the
merge() operator does:
I intentionally placed
preciseAlgo() (presumably slowest) first to emphasize that the order of
Observables passed to
merge() is rather arbitrary. The
merge() operator will keep a reference to all of the underlying
Observables, and as soon as someone subscribes to
Observable<LicensePlate> all, it will automatically subscribe to all upstream
Observables at once.
Of course, the
merge() operator follows the Rx contract, ensuring that events are serialized (do not overlap), even if underlying streams each emit a value at the same time. The following marble diagram illustrates how
merge() operator is used extensively when you want to treat multiple sources of events of the same type as a single source. Also, if you have just two
Observables you want to
merge(), you can use
obs1.mergeWith(obs2) instance method.
Keep in mind that errors appearing in any of the underlying
Observables will be eagerly propagated to
Observers. You can use the
mergeDelayError() variant of
merge() to postpone any errors until all of the other streams have finished.
mergeDelayError() will even make sure to collect all exceptions, not only the first one, and encapsulate them in
Zipping is the act of taking two (or more) streams and combining them with each other in such a way that each element from one stream is paired with corresponding event from the other. A downstream event is produced by composing the first event from each, second event from each stream, and so on.
Therefore, events appear only when all upstream sources emit an event. This is useful when you want to combine results from multiple streams that are somehow related to one another. Or, quite the contrary, when two independent streams emit values but only combining them together has business meaning. The following marble diagram illustrates how this works:
zipWith() operators are equivalent. We use the former when we want to fluently compose one stream with another, like so:
s1.zipWith(s2, ...). But when we have more than two streams to compose, static
zip() on Observable can take up to nine streams:
zip(), imagine that you have two independent streams, yet those streams are entirely synchronized with each other. For example, think about the
WeatherStation API that exposes temperature and wind measurements precisely every minute at the same time:
We have to make an assumption that events from these two
Observables are emitted at the same time and thus with the same frequency. Under this restriction, we can safely join these two streams by combining every pair of events. This means that when an event occurs on one stream, we must hold it until the other appears, and vice versa.
The name zip implies that there are two flows of events that we join together, one from left, one from right, repeat. But in a more general version,
zip() can take up to nine upstream Observables and emit an event only when all of them emit an event.
When a new
Temperature event occurs,
zipWith() waits (obviously without blocking!) for
Wind, and vice versa. Two events are passed to our custom lambda and combined into a
Weather object. Then, the cycle repeats.
zip() was described in terms of streams, even infinite ones.
However, often you will find yourself using
Observables that emit exactly one item. Such an
Observable is typically an asynchronous response to some request or action.
Suppose that you would like to plan a one-day vacation in some city when the weather is sunny and airfare and hotels are cheap. To do so, we will combine several streams together and come up with all possible results:
Quite a lot is happening in the preceding code. First, we generate all dates from tomorrow to 10 days ahead using a combination of
map(). Then, we
flatMap() these days with three cities—we do not want to use
zip() here, because we need all possible combinations of date versus city pairs. For each such pair, we create an instance of
Vacation class encapsulating it.
Now the real logic: we zip together three
Observable<Hotel>. The last two are supposed to return a zero or one result depending on whether cheap flight or hotel was found for that city/date.
Observable<Weather> always returns something, however, we use
filter(Weather::sunny) to discard nonsunny weather. So we end up with
zip() operation of three streams, each emitting zero to one items.
zip() completes early if any of the upstream
Observables complete, discarding other streams early: thanks to this property, if any of weather, flight, or hotel is absent, the result of
zip() completes with no items being emitted, as well. This leaves us with a stream of all possible vacation plans matching requirements.
Do not be surprised to see a
zip function that does not take arguments into account:
(w, f, h) -> vacation. An outer stream of Vacation lists all possible vacation plans for every possible day. However, for each vacation, we want to make sure weather, cheap flight, and hotel are present. If all these conditions are met, we return vacation instance; otherwise,
zip will not invoke our lambda expression at all.
If one of the streams outperforms the other even slightly, events from the faster
Observable will need to wait longer and longer for the lagging stream. To illustrate this effect, let’s first
zip() two streams that are producing items at the exact same pace:
timestamp() operator wraps whatever the event type
T was with
rx.schedulers.Timestamped<T> class having two attributes: original value of type
long timestamp when it was created.
zip() transformation, we simply compare the time difference between creation of events in each stream. When streams are synchronized, this value oscillates around zero. However, if we slightly slow down one
Observable, say green becomes
Observable.interval(11, MILLISECONDS), the situation is much different. The time difference between
green keeps going up:
red is consumed in real time but it must wait, increasing the amount of time for the slower item. Over time this difference piles up and can lead to stale data or even memory leak. In practice
zip() must be used carefully.
What we actually expect is emitting a pair every time any upstream produces an event, using the latest known value from the other stream. This is where
combineLatest() becomes useful, as illustrated by the following marble diagram:
Take the following artificial example. One stream produces
S2 values every 17 milliseconds whereas the other
F2 every 10 milliseconds (considerably faster):
We combine these two streams and produce a new value every time any of the streams produces something. The output quickly becomes out-of-sync, but at least values are consumed in real time, and the faster stream does not need to wait for the slower one:
combineLatest is symmetric, which means that it does not distinguish between the substreams it combines. Occasionally, however, you want to emit an event every time something appears in one stream with latest value from the second stream, but not vice versa.
In other words, events from the second stream do not trigger a downstream event; they are used only when first stream emits. You can achieve such behavior by using the new
withLatestFrom() operator. Let’s illustrate it with the same
In the prior example, the
slow stream is primary, the resulting
Observable will always emit an event when
slow emits, providing
fast emitted at least one element so far. Conversely,
fast stream is just a helper used only when
slow emits something.
The function passed as the second argument to
withLatestFrom() will combine every new value from
slow with the most recent value from
fast. However, new values from
fast are not propagated downstream; they are just updated internally when the new
slow appears. The output of the preceding code snippet reveals that all
slow events appear exactly once, whereas some
fast events are dropped:
slow events appearing before the first fast event are silently dropped because there is nothing with which to combine them. This is by design, but if you truly need to preserve all events from the primary stream, you must ensure that the other stream emits some dummy event as soon as possible.
For example, you can prepend said stream with some dummy event emitted immediately. The example that follows artificially slows the
fast stream by pushing all events 100 milliseconds forward. Without a dummy event, we would lose a few slow events; however, by using the
startWith() operator we create a new
Observable that derives from
fast. It starts with
"FX" immediately and then continues with events from the original
The output reveals that no
slow events are dropped. However, in the beginning we see dummy
"FX" events a few times, until the first
"F0" shows up after 100 milliseconds:
startWith() basically returns a new
Observable that, upon subscription, first emits some constant values (like
"FX") followed by original
Observable. For example, the following code block yields
2, in that order:
The last tiny operator that can become useful is
amb() (together with
ambWith()), which subscribes to all upstream
Observables it controls and waits for the very first item emitted. When one of the
Observables emits the first event,
amb() discards all other streams and just keep forwarding events from the first
Observable that woke up, as shown in the following marble diagram:
The sample that follows illustrates how
amb() works with two streams. Pay attention to
initialDelay parameter that controls which
Observable starts emitting first:
You can write an equivalent program using nonstatic
ambWith(), but it is less readable because it hides the symmetry of
amb(). It seems like we are applying the second stream on top of the first, whereas both of them should be treated equally:
No matter which version you prefer, they yield the same results. The
slow stream produces events less frequently, but the first event appears after 100 milliseconds, whereas the
fast stream begins after 200 milliseconds. What
amb() does is first subscribe to both
Observables, and when it encounters the first event in the
slow stream, it immediately unsubscribes from the
fast one and forwards events from only the slow one:
doOnUnsubscribe() callbacks are useful for debugging purposes. Notice how unsubscription from
F occurs roughly 100 millisecond after subscription to
S; this is the moment when first event from
Observable appeared. At this point, listening for events from
F no longer makes any sense.
Some operators allow more advanced transformations such as scanning through the sequence and aggregating some value along the way, like a running average. Some operators are even stateful, managing internal state while the sequence progresses. This is how
distinct works, caching and discarding already visited values.
All operators we explored so far operated on a per-event basis (e.g., filtering, mapping, or zipping). But sometimes you want to aggregate events to shrink the initial stream or simplify it. For example, consider an
Observable<Long> that monitors progress of data transfer. Every time a chunk of data is sent, a single
Long value appears telling, indicating the size of that chunk. This is a useful bit of information, but what we really want to know is how many bytes were transferred in total. A very bad idea is to use global state modified inside an operator:
The preceding code can lead to very unpleasant concurrency bugs, just like any other shared state. Lambda expressions within operators can be executed from arbitrary threads so global state must be thread safe. We must also take laziness into account. RxJava tries to minimize global state and mutability as much as possible by providing composable operators. Modifying global state is tricky, even with Rx guarantees.
Moreover, we can no longer rely on Rx operators to further compose
total–for example, by periodically updating user interface. Signaling when a transfer is completed is also more complex. What we really want is a way to incrementally accumulate sizes of data chunks and report the current total, every time a new chunk appears. This is what our hypothetical stream should look like:
You can implement this relatively complex workflow easily by using the
scan() takes two parameters: the last generated value (known as the accumulator) and current value from upstream
Observable. In the first iteration,
total is simply the first item from
progress, whereas in the second iteration it becomes the result of
scan() from the previous one.
scan() is like a bulldozer, going through the source (upstream)
Observable and accumulating items. Overloaded version of
scan() can provide an initial value (if it is different than simply the first element):
factorials will generate
720…, and so forth. Notice that the upstream
Observable starts from
2 but the downstream starts from
1, which was our initial value (
BigInteger.ONE). The rule of thumb is that the type of resulting
Observable is always the same as the type of accumulator. So, if you do not provide a custom initial value of accumulator, the type
T returned from
scan() will not change. Otherwise (like in our
factorials example), the result is of type
Observable<BigInteger> because BigInteger was the type of initial value.
Sometimes, we do not care about intermediate results, just the final one. For example, we want to calculate total bytes transferred, not intermediate progress. Or, we would like to accumulate all values in some mutable data structure, like
ArrayList, adding one item at a time. The
reduce() operator was designed precisely for that. One rather obvious caveat: if your sequence is infinite,
scan() keeps emitting events for each upstream event, whereas
reduce() will never emit any event.
Imagine that you have a source of
CashTransfer objects with
getAmount() method returning
BigDecimal. We would like to calculate the total amount on all transfers. The following two transformations are equivalent. They iterate over all transfers and add up amounts, beginning at
Both transformations yield the same result, but the second one seems simpler, despite using two steps. This is another reason to prefer smaller, more composable transformations over a single big one. Also you can probably see that
reduce() is basically
scan() talking only to the last element. As a matter of fact, you can implement it as follows:
Now, let’s transform the finite stream of events of type
T into a stream with just a single event of type
List<T>. Of course, that event is emitted when upstream
This example of
reduce() simply begins with empty
ArrayList<Integer> (an accumulator) and adds every emitted item to that
ArrayList. The lambda expression responsible for reduction (accumulating) must return a new version of accumulator.
List.add() does not return said
List; instead, it returns
boolean. An explicit return statement is required. To overcome this verboseness, you can use the
collect() operator. It works almost exactly like
reduce() but assumes that we use the same mutable accumulator for every event as opposed to returning a new immutable accumulator every time:
Another useful use case for
collect() is aggregating all events into a
StringBuilder. In that case, the accumulator is an empty
StringBuilder and an operation appends one item to that builder:
Just like every
Observable operator, both
collect() are nonblocking, so the resulting
List<Integer> containing all numbers emitted from
Observable.range(10, 20) will appear when upstream signals completion; exceptions are propagated normally. Transforming
Observable<List<T>> is so common that a built-in
toList() operator exists.
By the way, some
Observables by definition must emit exactly one value. For example, the preceding code snippet will always emit one
List<Integer>, even an empty one. In such circumstances, it is worthwhile to apply a
single() operator. It does not change the upstream
Observable in any way; however, it makes sure it emits exactly one event. In case this assumption is wrong, you will receive an exception instead of an unexpected result.
An infinite stream of simply random values can be really useful, typically when combined with other streams. The following
Observable produces pseudo-random
Integer values from 0 to 1,000 exclusive:
Obviously, duplicates can occur, and
take(1001) is guaranteed to have at least one. But what if we want to take a peek at smaller (say, 10) unique random values? The built-in
distinct() operator automatically discards events from upstream
Observable that already occurred, making sure only unique events are passed downstream:
Every time a new value is emitted from the upstream
distinct() operator internally makes sure such value did not occur before. The comparison happens by means of
hashCode(), so ensure that you implement them according to Java guidelines (two equal objects must have the same hash code). Interestingly,
take(1001) would eventually emit every single value from 0 to 999 in random order and never complete because there is no 1,001st unique int between 0 and 999.
In “Use Case: From Callback API to Observable Stream”, we looked at
Observable<twitter4j.Status> that was emitting status updates generated on social media website Twitter. Every time any user posted a status update, new event was pushed from that Observable. The
Status object contains several attributes, like
getUser(), and so on. The
distinct() operator makes no sense for
Status events, given that duplicates are virtually impossible. But, what if we would like to see the text of only the very first update per each user (
long)? Obviously, we can extract that unique property and run
distinct() on that:
Unfortunately, by the time we get to execute
distinct(), the original
Status object is lost. What we really need is a way to extract a property of event used to determine uniqueness. Two events are considered equal (and the latter being discarded as a result) if that extracted property (known as key) was already seen:
Whatever we return as key is compared using
hashCode() to already seen keys. Be sure to remember that
distinct() must keep in mind all events/keys seen so far for eternity.
distinctUntilChanged() is often more reasonable. In the case of
distinctUntilChanged(), any given event is discarded only if the previous event was the same (by default using
equals() for comparison).
distinctUntilChanged() works best when we receive a steady stream of some measurements and we want to be notified only when the measured value actually changed.
The preceding code snippet emits a Weather event only when the temperature changes (changes to Wind are not taken into account). Obviously, if we want to an emit event every time either
Wind changes, parameterless
distinctUntilChanged() would work great, assuming that
equals(). The important difference between
distinctUntilChanged() is that the latter can produce duplicates but only if they were separated by a different value.
For example, the same temperature might occur every day, separated by colder and warmer measurements. Also
distinctUntilChanged() must only remember the last seen value, as opposed to
distinct(), which must keep track of all unique values since the beginning of the stream. This means that
distinctUntilChanged() has a predictable, constant memory footprint, as opposed to
You are never obligated to read the stream fully, especially but not exclusively when dealing with hot infinite
Observables. As a matter of fact, it is a common practice to slice
Observable and consume just a small subset.
take(n) operator will truncate the source
Observable prematurely after emitting only the first
n events from upstream, unsubscribing afterward (or complete earlier if upstream did not have
skip(n) is the exact opposite; it discards the first
n elements and begins emitting events from the upstream
Observable beginning with event
n+1. Both operators are quite liberal: negative numbers are treated as zero, exceeding the
Observable size is not treated as a bug:
Another self-descriptive pair of operators.
takeLast(n) emits only the last
n values from the stream before it completes. Internally, this operator must keep a buffer of the last
n values and when it receives completion notification, it immediately emits the entire buffer.
It makes no sense to call
takeLast() on an infinite stream because it will never emit anything—the stream never ends, so there are no last events.
skipLast(n), on the other hand, emits all values from upstream
Observable except the last
skipLast() can emit the first value from upstream only when it received
n+1 elements, second when it received
n+2, and so on.
last() operators can be implement via
takeLast(1).single() accordingly, which should pretty much describe their behavior. The extra
single() operator ensures that the downstream Observable emits precisely one value or exception. Additionally, both
last() have overloaded versions that take predicates. Rather than returning the very first/last value they emit first/last value, matching a given condition.
takeFirst(predicate) operator can be expressed by
filter(predicate).take(1). The only difference between this one and
first(predicate) is that it will not break with
NoSuchElementException in case of missing matching values.
takeWhile(predicate) are closely related to each other.
takeUntil() emits values from the source
Observable but completes and unsubscribes after emitting the very first value matching
takeWhile(), conversely, emits values as long as they match a given
predicate. So the only difference is that
takeUntil() will emit the first nonmatching value, whereas
takeWhile() will not.
These operators are quite important because they provide a means of conditionally unsubscribing from an
Observable based on the events being emitted. Otherwise, the operator would need to somehow interact with the
Subscription instance, which is not available when the operator is invoked.
Extracting a specific item by index is rather uncommon, but you can use the built-in
elementAt(n) operator for that. It is quite strict, and it can result in an
IndexOutOfBoundsException being emitted when upstream
Observable is not long enough or the index is negative. Of course, it returns
Observable<T> of the same type
T as upstream.
Many operators in this section are strict and can result in exceptions being thrown—for example,
first() when upstream
Observable is empty. Under these circumstances many
...OrDefault operators were introduced to replace exceptions with a default value. All of them are rather self-explanatory:
count() is an interesting operator that calculates how many events were emitted by upstream
Observable. By the way, if you need to know how many items matching a given predicate that the upstream
filter(predicate).count() can do that idiomatically. Do not worry, all operators are lazy so this will work even for quite large streams. Obviously,
count() never emits any value in case of infinite stream. You can implement
count() easily by using
Sometimes, it is useful to ensure that all events from a given
Observable match some predicate. The
all(predicate) operator will emit
true when upstream completes and all values matched the predicate. However,
false will be emitted as soon as first nonconforming value is found.
exists(predicate) is the exact opposite of
all(); it emits
true when the first matching value is found but
false in case of upstream completing without any matching value found. Often, our predicate in
exists() simply compares upstream values with some constants. In that case, you can use the
concat() (and instance method
concatWith()) allow joining together two
Observables: when the first one completes,
concat() subscribes to the second one. Importantly,
concat() will subscribe to the second
Observable if, and only if, the first one is completed.
concat() can even work with the same upstream
Observable with different operators applied. For example if we would like to receive only the first few and last few items from a very long stream, we could use the following:
Keep in mind that the preceding code example subscribes to veryLong twice, which might be undesirable. Another example of
concat() is providing fallback value when first stream did not emit anything:
Observables are lazy, so neither
loadFromDb() actually load any data yet.
loadFromCache() can complete without emitting any events when cache is empty, but
loadFromDb() always emits one Car.
concat() followed by
first() will initially subscribe to
fromCache and if that emits one item,
concat() will not subscribe to
fromDb. However, if fromCache is empty,
concat() will continue with
fromDb, subscribe to it, and load data from database.
concat() operator is actually closely related to
merge() and switchMap().
concat() works like concatenation on ordinary
List<T>: first, it takes all items from the first stream and only when it completes, it begins consuming second stream. Of course, like all operators we met so far, c
oncat() is nonblocking, it emits events only when the underlying stream emits something. Now, let’s compare
switchOnNext() just being introduced.
Consider a group of people, each one having microphone. Every microphone is modeled as an
Observable<String>, for which an event represents a single word. Obviously, events appear over time, as soon as they are spoken. To simulate this behavior we will construct a simple
Observable for demonstration purposes, interesting on its own:
We take an arbitrary text in
String and split it to words, removing punctuation using a regular expression. Now, for each word we calculate how much it takes to say that word, simply by multiplying the word length by
millisPerChar. Then, we would like to spread words over time, so that each word appears in the resulting stream after the delay calculated in the preceding example. Clearly, a simple
from operator is not enough:
We want words to appear with delay, based on the length of the previous word. The first naive approach simply delays each word, given its length:
This solution is incorrect. The
Observable will first emit all one-letter words at the same time. Then, after a while, all two-letter words followed by all three-letter words. What we want is to have the first word appear immediately and then the second word after a delay, depending on the length of the first word.
This sounds terribly complex but turns out to be quite pleasant. First we create a helper stream from
words that contains only relative delays induced by each word:
millisPerChar is 100 and
words are Though this be madness, we first get the following stream: 600, 400, 200, 700. If we were to simply
delay() each word by that duration, “be“ word would appear first and other words would be scrambled as well. What we really want is a cumulative sequence of absolute delays, like this: 600, 600 + 400 = 1,000; 1,000 + 200 = 1,200; 1,200 + 700 = 1,900. This is easy using the
Now when we have a sequence of words and a sequence of absolute delays for each one of them, we can
zip these two streams. This is the kind of a situation in which
This makes a lot of sense because we know two streams have exact the same size and are entirely in sync with each other. Well…almost. We do not want the first word to be delayed at all. Instead, the length of the first word should influence the delay of the second word, the total length of the first and second word should influence the delay of the third word, and so on. You can achieve such a shift easily by simply prepending
We construct a sequence of pair words—absolute delay of that word, making sure the first word is not delayed at all. These pairs might look as follows:
This is our speech time line, each word accompanied with its point in time. All we need to do is turn every pair into a one-element
Observable shifted in time:
After so much preparation, we can finally see how
switchOnNext() differ. Suppose that three people were quoting Hamlet by William Shakespeare:
As you can see, each person has a slightly different pace measured in
millisPerChar. What happens if all people speak at the same time? RxJava can answer this question:
The output is very chaotic, words spoken by each person interleave with each other. All we hear is noise, and without prefixing each phrase, it would have been difficult to understand:
This is how
merge() works: it subscribes to words of each person immediately and forwards them downstream, no matter which person is speaking. If two streams emit an event at more or less the same time, they are both forwarded right away. There is no buffering or halting events within this operator.
The situation is much different if we replace
Now the order is perfect.
concat(alice, bob, jane) first subscribes to
alice and keeps forwarding events from that first
Observable until it is exhausted and completed. Then,
concat() switches to
Think about hot and cold
Observables for a while. In case of
merge(), all events from all streams are forwarded because
merge() subscribes eagerly to every stream. However,
concat() subscribes just to the first stream, so in case of hot
Observable, you might expect a different outcome. By the time the first
Observable is completed, the second one might be sending an entirely different sequence of events. Keep in mind that
concat() does not buffer second
Observable until the first one completes; instead, it simply subscribes lazily.
switchOnNext() is an entirely different way of combining operators. Imagine that you have an
Observable<Observable<T>> that is a stream of events for which each event is a stream on its own.
This situation actually makes sense, for example, if you have a set of mobile phones connecting and disconnecting to the network (outer stream). Each new connection is an event, but every such event is a stream of independent heartbeat messages (
In our case, we will have an
Observable<Observable<String>>, where each inner stream is a quote from a different person:
First, we wrap
Observables into an
Observable<Observable<String>>. Let us reiterate: quotes
Observable emits three events immediately, each event being an inner
Observable<String>. Every inner
Observable<String> represents words spoken by each person.
To illustrate how
switchOnNext() works, we shall delay the emission of inner
Observables. We are not delaying each word within that
Observable (variant A) but the entire
Observable (variant B is subtly different):
In variant A, the
Observable appears immediately in the outer stream but begins emitting events with some delay. In variant B, on the other hand, we shift the entire
Observable event forward in time so that it appears in the outer
Observable much later.
Now the reason why we needed such a complex setup. Both static
merge() operators can work with either a fixed list of
Observables. In the case of
switchOnNext(), the ladder makes sense.
switchOnNext() begins by subscribing to an outer
Observable<Observable<T>>, which emits inner
Observable<T>s. As soon as the first inner
Observable<T> appears, this operator subscribes to it and begins pushing events of type T downstream. Now what happens if next inner
switchOnNext() discards the first
Observable<T> by unsubscribing from it and switches to the next one (thus, the name). In other words, when we have a stream of streams,
switchOnNext() always forwards downstream events from the last inner stream, even if older streams keep forwarding fresh events.
This is how it looks in our Hamlet quoting example:
One of the possible outcomes, due to the random nature of this example, could look like this:
Each person starts speaking with zero to four seconds random delay. In this particular round, it was Jane’s
Observable<String>, but after citing few words, Alice’s
Observable<String> appeared in the outer
Observable. At this point
switchOnNext() unsubscribes from
jane, and we never hear the rest of this quote. This
Observable is discarded and ignored,
switchOnNext() only listens to
alice at the moment. However, again the inner
Observable is interrupted because Bob’s quote appears. Theoretically,
switchOnNext() could produce all of the events from the inner
Observables if they did not overlap, completing before the next one appears.
Now what would happen in the case of delaying only events in every inner
Observable (variant A, remember?) rather than delaying
Observables themselves? Well, three inner
Observables would appear at the same time in outer
switchOnNext() would only subscribe to one of them.
One of the techniques often used together with domain-driven design (for more information about data-driven design is event sourcing. In this architecture style, data is not stored as a snapshot of current state and mutated in place; that is, using SQL UPDATE queries.
Instead, a sequence of immutable domain events (facts) about events that already happened are kept in an append-only data store. Using this design, we never overwrite any data, effectively having an audit log for free. Moreover, the only way to see the data in real time is by applying these facts one after another, starting from an empty view.
The process of applying events on top of an initial empty state is known as projection in event sourcing. A single source of facts can drive multiple different projections. For example, we might have a stream of facts related to a reservation system, like
TicketBought—the past tense is important because facts always reflect actions and events that already occurred. From a single stream of facts (also being the single source of truth), we can derive multiple projections, such as the following:
- List of all confirmed reservations
- List of reservations canceled today
- Total revenue per week
When the system evolves, we can discard old projections and build new ones, taking advantage of data collected eagerly in facts. Suppose that you would like to build a projection containing all reservations together with their status.
Obviously, the stream of
facts is expressed as
Observable. Some other part of the system receives API calls or web requests, reacts (e.g., charges the customer’s credit card) and stores facts (domain events) about what happened. Other parts of the system (or even other systems!) can consume these facts by subscribing to a stream and building a snapshot of current system state from some arbitrary perspective.
Our code is quite simple: each
ReservationEvent loads a
Reservation from our projection’s data store. If
Reservation was not found, it means that it was the very first event associated with this
UUID, so we begin with an empty
Reservation. Then, we pass
Reservation object. It can update itself to reflect any type of fact. Then, we store
This brief introduction to event sourcing will help us to understand why
groupBy() operator is useful. After a while, we discovered that updates to
Reservation projection fall behind, we cannot keep up with the rate of facts being generated. The data store can easily handle concurrent reads and updates, so we can try to parallelize handling of facts:
In this case, we consume facts in parallel, or to be more precise: receiving is sequential but handling (in
updateProjectionAsync()) is possibly asynchronous.
updateProjectionAsync() alters the state of supplied
Reservation objects inside a projection. But a look at how
updateProjection() was implemented we quickly see a possible race-condition: two threads can consume different events, modify the same
Reservation and try to store it—but the first update is overwritten and effectively lost.
This is where
groupBy() comes in handy. It splits a stream based on some key into multiple parallel streams, each holding events with given key. In this case, we want to split one huge stream of all facts regarding reservations into large number of smaller streams, each emitting only events related to a specific
This example contains quite a few new constructs. First, we take upstream
Observable<ReservationEvent> stream and group it by
You might first expect that
groupBy() should return a
List<Observable<ReservationEvent>>—after all we transform a single stream into multiple ones. This assumption breaks when you realize that
groupBy() cannot possibly know how many different keys (
UUIDs) will generate upstream.
Therefore, it must produce them on-the-fly: whenever a new
UUID is discovered, the new
GroupedObservable<UUID, ReservationEvent> is emitted, pushing events related to that
UUID. So it becomes clear that the outer data structure must be an
But what is this
GroupedObservable<UUID, ReservationEvent> anyway?
GroupedObservable is a simple subclass of
Observable that apart from the standard
Observable contract returns a key to which all events in that stream belong (
UUID, in our case).
The number of emitted
GroupedObservables can be anything from one (in case of all events having the same key) to the total number of events (if each upstream event has a unique key). This is one of these cases for which nested
Observables are not that bad. When we subscribe to the outer
Observable, every emitted value is actually another
GroupedObservable) to which you can subscribe. For example, each inner stream can provide events related to one another (like the same correlation ID), however, inner streams are unrelated to one another and can be processed separately.
There are dozens of other operators built in to RxJava. Many of them will be explained in Chapter 6, but going through the entire API is not very reasonable and quite time consuming. Also such an exhaustive description would become obsolete from version to version. However, you should have a basic understanding of what operators can do for you and how they work. The next logical step is writing custom operators.
We barely scratched the surface of available operators in RxJava, and you will learn many more throughout the course of this book. Moreover, the true power of operators comes from their composition. Following the UNIX philosophy of “small, sharp tools” each operator is doing one, small transformation at a time.
This section will first guide you through the
compose() operator, which allows fluent composition of smaller operators, and later introduces the
lift() operator, which helps you to write entirely new custom operators.
Let’s begin by looking at an example. For some reason, we want to transform an upstream Observable so that every other item is discarded and we only receive even items. In “Flow Control”, we will learn about the
buffer() operator that makes this task very simple (
buffer(1, 2) does almost exactly what we want). However, we will pretend that we do not know this operator so far, but we can implement this functionality easily by composing several operators:
First, we generate an infinite
false alternately. We can implement this easily by creating a fixed
[true, false] stream with just two items and then repeating it infinitely with the
repeat() simply intercepts completion notification from upstream and rather than passing it downstream it resubscribes. Therefore, it is not guaranteed that
repeat() will keep cycling through the same sequence of events, but it happens to be the case when upstream is a simple fixed stream.
zipWith() our upstream
Observable with this infinite stream of
false. However, zipping requires a function that combines two items. This is simpler in other languages; in Java, we help ourselves by using the Apache Commons Lang library, which provides a simple
Pair class. If you want to avoid third-party library, an alternative implementation follows:
At first glance,
flatMap() looks kind of odd and doesn’t appear to be doing anything that is actually crucial. From
zipWith() transformation we return an
Observable (one element or empty), which leads to
Observable<Observable<T>>. By using
flatMap() this way we get rid of this nesting level—after all, a lambda expression in
flatMap() is supposed to return an
Observable for each input element, which also happens to be an
No matter which implementation you choose, this is not very reusable. If you need to reuse “every odd element” sequence of operators, you either copy-paste them or create a utility method like this:
But you can no longer fluently chain operators; in other words, you cannot say:
obs.op1().odd().op2(). Unlike C# (where reactive extensions originated) and Scala (via implicits), Java does not allow extension methods. But the built-in
compose() operator comes as close as possible.
compose() takes a function as an argument that is supposed to transform the upstream
Observable via a series of other operators. This is how it works in practice:
odd() function returns a
Transformer<T, T> from
Observable<T> (of course, types can be different). Thus,
Transformer is a function on its own, so we can replace it with a lambda expression (
upstream -> upstream...).
Notice that the
odd() function is executed eagerly when Observable is assembled, not during subscription. Interestingly, if you want to emit even values (2nd, 4th, 6th, etc.) rather than odd (1st, 3rd, 5th, etc.), simply replace
Implementing custom operators is tricky because backpressure and the subscription mechanism need to be taken into account. Therefore, try your best to implement your requirements from existing operators rather than inventing your own. Built-in operators are much better tested and proven.
However, if none of the supplied operators work for you, the
lift() meta-operator will help.
compose() is only useful for grouping existing operators together. With
lift(), on the other hand, you can implement almost any operator, altering the flow of upstream events.
lift() allows transforming
Subscribers. When you
subscribe() to an
Subscriber instance wrapping your callback travels up to the
Observable it subscribed to and causes
create() method to be invoked with our subscriber as an argument (gross simplification). So every time we subscribe, a
Subscriber travels up through all operators to the original
Observable. Obviously, between the
subscribe() there can be an arbitrary number of operators, altering events flowing downstream, as illustrated here:
But here is an interesting fact: if you look up the source code of RxJava and replace operator invocations with their body, this quite complex sequence of operators becomes very regular (notice how
reduce() is implemented using
Almost all operators, excluding those working with multiple streams at once (like
flatMap()) are implemented by means of
lift(). When we
subscribe() at the very bottom, a
Subscriber<String> instance is created and passed to the immediate predecessor. It can be “true”
Observable<String> that emits events or just the result of some operator,
map(Integer::toHexString) in our case.
map() itself does not emit events, yet it received a
Subscriber that wants to receive them. What
map() does (through the
lift() helper operator) is it transparently subscribes to its parent (
reduce() in the preceding example). However, it cannot pass the same
Subscriber instance it received. This is because
After all, that is what
map() is doing here: transforming
String. So instead,
map() operator creates a new artificial
Subscriber<Integer> and every time this special
Subscriber receives anything, it applies
Integer::toHexString function and notifies the downstream
This is essentially what
OperatorMap class is doing: providing a transformation from downstream (child)
Subscriber<R> into upstream
Subscriber<T>. Here is the real implementation found in RxJava, with some minor readability simplifications:
One unusual detail is the reversed order of
R generic types. The
map() operator transforms values flowing from upstream of type
T to type
R. However, the operator’s responsibility is transforming
Subscriber<R> (coming from downstream subscription) to
Subscriber<T> (passed to upstream
Observable). We expect subscribe via
Subscriber<R>, whereas operator
map() is used against
Keep in mind that until someone actually subscribes, we barely created a new
lift(), like any other operator, creates new
Observable) with a reference to
OperatorMap instance underneath, which in turns holds a reference to our function. But only when someone actually subscribes, the
call() function of
OperatorMap is invoked. This function receives our
Subscriber<String> (e.g., wrapping
System.out::println) and returns another
Subscriber<Integer>. It is the latter
Subscriber that travels upstream, to preceding operators.
That is pretty much how all operators work, both built-in and custom. You receive a
Subscriber and return another one, enhancing and passing whatever it wishes to downstream
This time we would like to implement an operator that will emit
toString() of every odd (1st, 3rd, 5th, etc.) element. It is best explained with some sample code:
You can achieve the same functionality by using built-in operators, we are writing a custom operator just for educational purposes:
buffer() will be introduced in “Buffering Events to a List”, for the time being, all you need to know is that
buffer(1, 2) will transform any
Observable<List<T>>, where each inner
List has exactly one odd element and skips even ones. Having a stream of lists like
List(3), and so on, we reconstruct a flat stream using
concatMapIterable(). But for the sake of learning experience, let’s implement a custom operator that does that in a single step. The custom operator can be in one of two states:
- It either received odd event (1st, 3rd, 5th, etc.) from upstream which it forwards downstream after applying it to
- It received even event which it simply discards
Then cycle repeats. The operator might look like this:
request(1) invocation will be explained much later in “Honoring the Requested Amount of Data”. For now you can understand it like this: when a
Subscriber requests just a subset of events—for example, only the first two (
take(2))—RxJava takes care of requesting only that amount of data by calling
This request is passed upstream and we receive barely
2. However, we drop
2 (even), yet we were obligated to provide two events downstream. Therefore, we must request one extra event (
request(1)) in addition to that so that we receive
3, as well. RxJava implements quite a sophisticated mechanism called backpressure that allows subscribers to request only the amount of events they can process, protecting from producers outperforming consumers.
Unfortunately, for better or worse,
null is a valid event value in RxJava; that is,
Observable.just("A", null, "B") is as good as any other stream. You need to take that into account when designing custom operators as well as when applying operators. However, passing
null is generally considered nonidiomatic, and you should use wrapper value types, instead.
Another interesting pitfall you might encounter is failing to provide a child
Subscriber as an argument to the new
Subscriber, like here:
The parameterless constructor of
Subscriber is fine, and again our operator seems to work. But let’s see how it goes with infinite stream:
We build an infinite stream of numbers (1, 2, 3, 4, 1, 2, 3…), apply our operator (“1”, “3”, “1”, “3”…), and take only the first three values. This is absolutely fine and should never fail; after all, streams are lazy. But remove
new Subscriber(child) constructor and our
Observable never notifies about completion after receiving 1, 3, 1. What happened?
take(3) operator requested only the first three values and wanted to
unsubscribe(). Unfortunately, the unsubscription requested never made it to the original stream, which keeps producing values. Even worse, these values are processed by our custom operator and passed to downstream
take(3)), which is not even listening anymore.
Implementation details aside, as a rule of thumb, pass the downstream
Subscriber as a constructor argument to the new
Subscriber when writing your own operators. A no-argument constructor is used rarely and very unlikely you will need it for simple operators.
This is just the tip of the iceberg with respect to the issues you can encounter when writing your own operators. Luckily, very seldom are we not able to achieve what we want to accomplish with built-in mechanisms.
The true power of RxJava lies in its operators. Declarative transformations of streams of data is safe yet expressive and flexible. With a strong foundation in functional programming, operators play deciding role in RxJava adoption.
Mastering built-in operators is a key to success in this library. But remember we did not see all operators yet—for example, see “Flow Control”. But at this point, you should have a good overview of what RxJava can do and how to enhance it when it cannot do something directly.
(To Be Continued)