Reactive Programming with RxJava: Operators and Transformations

Operators and Transformations

The aim of this chapter is to explain fundamentals of RxJava’s operators and how you can compose them to build high-level, easy-to-reason data pipelines. One of the reasons why RxJava is so powerful is the rich universe of built-in operators it provides and the possibility of creating custom ones. An operator is a function that takes upstream Observable<T> and returns downstream Observable<R>, where types T and R might or might not be the same. Operators allow composing simple transformations into complex processing graphs.

But even when you recognize how wonderful Rx operators are, the true power comes from combining them together. Chaining several operators, forking stream into multiple substreams and then joining them back is idiomatic and you should feel fairly comfortable with it.

Core Operators: Mapping and Filtering

Operators are typically instance methods on Observable that somehow alter the behavior of upstream Observable as seen by downstream Observables or Subscribers. This might sound complex, but it is actually quite flexible and not that difficult to grasp. One of the simplest examples of operators is filter(), which takes a predicate and either passes events further or discards them:

1
2
Observable<String> strings = //...
Observable<String> filtered = strings.filter(s -> s.startsWith("#"));

It’s now time to introduce the so-called marble diagrams, the ubiquitous visualizations in RxJava documentation. A marble diagram illustrates how various operators work:

Marble Diagram

The diagram that follows is a concrete example of a marble diagram representing a filter() operator. The Observable.filter() returns the exact same events (so the marbles on top and on the bottom are the same), but some events are skipped because they did not satisfy the predicate:

Filter

When dealing with certain types of Observables, some events might be out of your interest, for example when consuming high volumes of data. It is also a common practice to filter() the same Observable multiple times, each time with a different predicate.

1
2
3
4
Observable<String> strings = someFileSource.lines();
Observable<String> comments = strings.filter(s -> s.startsWith("#"));
Observable<String> instructions = strings.filter(s -> s.startsWith(">"));
Observable<String> empty = strings.filter(String::isBlank);

In RxJava, you must forget about mutating data structures internally: modifying variables outside of stream is considered very nonidiomatic and dangerous. Every single operator returns a new Observable, leaving the original one untouched.

This makes reasoning about the flow of events much simpler. You can fork a stream into multiple independent sources, each having different characteristics. One of the powers of RxJava is that you can reuse a single Observable in multiple places without affecting other consumers. If you pass an Observable to some unknown function you can be sure that this Observable will not become corrupted in any way by that function.

1-to-1 Transformations Using map()

Imagine that you have a stream of some events and you must perform certain transformation on each event. This can be decoding from JSON to Java object (or vice versa), enriching, wrapping, extracting from the event, and so on. This is where the invaluable map() operator is useful.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import rx.functions.Func1;
Observable<Status> tweets = //...
Observable<Date> dates = tweets.map(new Func1<Status, Date>() {
@Override
public Date call(Status status) {
return status.getCreatedAt();
}
});
Observable<Date> dates =
tweets.map((Status status) -> status.getCreatedAt());
Observable<Date> dates =
tweets.map((status) -> status.getCreatedAt());
Observable<Date> dates =
tweets.map(Status::getCreatedAt);

All of the ways in which you can define dates Observable are equivalent, from the most verbose using Func1<T, R> to the most compact Java 8 syntax method reference and type inference.

Map

Observables are lazy, meaning that they do not begin producing events until someone subscribes. You can create infinite streams that take hours to compute the first value, but until you actually express your desire to be notified about these events, Observable is just a passive and idle data structure for some type T.

This even applies to hot Observables — even though the source of events keeps producing them, not a single operator like map() or filter() is evaluated until someone actually shows an interest. Otherwise, running all of these computational steps and throwing away the result would make no sense.

Wrapping Up Using flatMap()

flatMap() is among the most important operators in RxJava. At first sight, it is similar to map() but the transformation of each element can return another (nested, inner) Observable.

Recognizing that Observable can represent another asynchronous operation, we quickly discover that flatMap() can be used to spawn asynchronous computation for each upstream event (fork execution) and join the results back. Conceptually flatMap() takes Observable<T> and a function from T to Observable<R>. flatMap() first constructs Observable<Observable<R>> replacing all upstream values of type T with Observable<R> (just like map()).

However, it does not stop there: it automatically subscribes to these inner Observable<R> streams to produce a single stream of type R, containing all values from all inner streams, as they come. The marble diagram that follows shows how this works:

Flat Map

Because RxJava concurrently subscribes to both of them and merges them together, events produced from one inner Observable can interleave with events from another.

flatMap() is the most fundamental operator in RxJava, using it one can easily implement map() or filter():

1
2
3
4
5
6
7
8
9
import static rx.Observable.empty;
import static rx.Observable.just;
numbers.map(x -> x * 2);
numbers.filter(x -> x != 10);
//equivalent
numbers.flatMap(x -> just(x * 2));
numbers.flatMap(x -> (x != 10) ? just(x) : empty());

As a rule of thumb, you use flatMap() for the following situations:

  • The result of transformation in map() must be an Observable. For example, performing long-running, asynchronous operation on each element of the stream without blocking
  • You need a one-to-many transformation, a single event is expanded into multiple sub-events. For example, a stream of customers is translated into streams of their orders, for which each customer can have an arbitrary number of orders

Now imagine that you would like to use a method returning an Iterable (like List or Set). For example, if Customer has a simple List<Order> getOrders(), you are forced to go through several operators to take advantage of it in Observable pipeline:

1
2
3
4
Observable<Customer> customers = //...
Observable<Order> orders = customers
.flatMap(customer ->
Observable.from(customer.getOrders()));

Or, equivalent and equally verbose:

1
2
3
Observable<Order> orders = customers
.map(Customer::getOrders)
.flatMap(Observable::from);

The need to map from a single item to Iterable is so popular that an operator, flatMapIterable(), was created to perform just such a transformation:

1
2
Observable<Order> orders = customers
.flatMapIterable(Customer::getOrders);

You must take care when simply wrapping methods in an Observable. If getOrders() was not a simple getter but an expensive operation in terms of run time, it is better to implement getOrders() to explicitly return Observable<Order>.

Another interesting variant of flatMap() can react not only to events, but on any notification, namely events, errors, and completion. The simplified signature of this flatMap() overload follows. For an Observable<T> we must provide the following:

  • A function mapping single TObservable<R>
  • A function mapping an error notification → Observable<R>
  • A no-arg function reacting on upstream completion that can return Observable<R>
1
2
3
4
<R> Observable<R> flatMap(
Func1<T, Observable<R>> onNext,
Func1<Throwable, Observable<R>> onError,
Func0<Observable<R>> onCompleted);

Imagine that you are creating a service that uploads videos. It takes a UUID and returns upload progress with Observable<Long> — how many bytes it transferred. We can take advantage of that progress anyway — for example, displaying it in the user interface. But what we are really interested in is completion, when the upload is finally done. Only after a successful upload can we begin rating the video. Naive implementation can simply subscribe to the progress stream, ignoring events and only reacting on completion (last callback):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void store(UUID id) {
upload(id).subscribe(
bytes -> {}, //ignore
e -> log.error("Error", e),
() -> rate(id)
);
}
Observable<Long> upload(UUID id) {
//...
}
Observable<Rating> rate(UUID id) {
//...
}

However, notice that the rate() method actually returns Observable<Rating> that got lost. What we really want is for the store() method to return that second Observable<Rating>. But we can’t simply call upload() and rate() concurrently, because the latter will fail if the former did not finish yet. The answer is flatMap() again in the most complex form:

1
2
3
4
5
6
upload(id)
.flatMap(
bytes -> Observable.empty(),
e -> Observable.error(e),
() -> rate(id)
);

Take a moment to digest the preceding code snippet. We have an Observable<Long> as returned by the upload() method. For each progress update of type Long we return Observable.empty(), effectively discarding these events.

Moreover, we are not interested in errors, but contrary to logging them, we pass them through to the subscriber. The rule of thumb is that if you don’t know how to handle an exception, let your supervisor (e.g., the calling method, parent task, or downstream Observable) make a decision.

Finally, the last lambda expression (() -> rate(id)) reacts upon stream completion. At this point, we replace completion notification with another Observable<Rating>. So, even if the original Observable wanted to terminate, we ignore that and in a way append a different Observable.

Keep in mind that all three callbacks must return Observable<R> of the same type R.

In practice, we do not replace map() and filter() with flatMap() due to the clarity of code and performance. Just to make sure you understand the syntactic part of flatMap(), another abstract example translates from a sequence of characters to Morse code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import static rx.Observable.empty;
import static rx.Observable.just;
Observable<Sound> toMorseCode(char ch) {
switch(ch) {
case 'A': return just(DI, DAH);
case 'B': return just(DAH, DI, DI, DI);
case 'C': return just(DAH, DI, DAH, DI);
//...
case 'P': return just(DI, DAH, DAH, DI);
case 'R': return just(DI, DAH, DI);
case 'S': return just(DI, DI, DI);
case 'T': return just(DAH);
//...
default:
return empty();
}
}
enum Sound { DI, DAH }
//...
just('S', 'P', 'A', 'R', 'T', 'A')
.map(Character::toLowerCase)
.flatMap(this::toMorseCode);

As you can clearly see, every character is replaced by a sequence of DI and DAH sounds (dots and dashes). When character is unrecognizable, an empty sequence is returned. flatMap() ensures that we get a steady, flat stream of sounds, as opposed to Observable<Observable<Sound>>, which we would get with plain map().

Postponing Events Using the delay() Operator

delay() basically takes an upstream Observable and shifts all events further in time.

1
2
3
import java.util.concurrent.TimeUnit;
just(x, y, z).delay(1, TimeUnit.SECONDS);

We can replace delay() with timer() and (surprise!) flatMap() like this:

1
2
3
Observable
.timer(1, TimeUnit.SECONDS)
.flatMap(i -> Observable.just(x, y, z))

I hope this is clear: we generate an artificial event from timer() that we completely ignore. However, using flatMap() we replace that artificial event (zero, in i value) with three immediately emitted values: x, y, and z. This is somewhat equivalent to just(x, y, z).delay(1, SECONDS) in this particular case; however, it is not so in general. delay() is more comprehensive than timer() because it shifts every single event further by a given amount of time, whereas timer() simply “sleeps” and emits a special event after given time.

For completeness, let us mention about an overloaded variant of delay() that can compute the amount of delay on a per-event basis rather than globally for every event. The following code snippet delays the emission of every String, depending on how long that String is:

1
2
3
4
5
6
7
8
9
10
import static rx.Observable.timer;
import static java.util.concurrent.TimeUnit.SECONDS;
Observable
.just("Lorem", "ipsum", "dolor", "sit", "amet",
"consectetur", "adipiscing", "elit")
.delay(word -> timer(word.length(), SECONDS))
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(15);

When running this program, even after subscribing, your application will terminate immediately without displaying any results because emission occurs in the background. In Chapter 4, you will learn about BlockingObservable that makes such simple testing easier.

What you will notice then is that the first word to occur is sit, followed by amet and elit one second later. Remember that delay() can be rewritten to timer() plus flatMap()?

1
2
3
4
5
Observable
.just("Lorem", "ipsum", "dolor", "sit", "amet",
"consectetur", "adipiscing", "elit")
.flatMap(word ->
timer(word.length(), SECONDS).map(x -> word));

Order of Events After flatMap()

What flatMap() essentially does is take a master sequence (Observable) of values appearing over time (events) and replaces each of the events with an independent subsequence. These subsequences are generally unrelated to one another and to the event that generated them from master sequence.

To make it clear, you no longer have a single the master sequence but a set of Observables, each working on its own, coming and going over time. Therefore, flatMap() cannot give any guarantee about what order of those subevents will arrive at the downstream operator/subscriber. Take this simple code snippet as an example:

1
2
3
4
just(10L, 1L)
.flatMap(x ->
just(x).delay(x, TimeUnit.SECONDS))
.subscribe(System.out::println);

In this example, we delay event 10L by 10 seconds and event 1L (chronologically appearing later in upstream) by 1 second. As a result, we see 1 after a second and 10 nine seconds later—the order of events in upstream and downstream is different!

Preserving Order Using concatMap()

What if you absolutely need to keep the order of downstream events so that they align perfectly with upstream events? In other words, downstream events resulted from upstream event N must occur before events from N + 1. It turns out there is a handy concatMap() operator that has the exact same syntax as flatMap() but works quite differently:

1
2
3
Observable
.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.concatMap(this::loadRecordsFor);

This time the output is exactly what we anticipated:

1
Sun-0, Sun-1, Sun-2, Sun-3, Sun-4, Mon-0, Mon-1, Mon-2, Mon-3, Mon-4

So what happened under the hood? When the first event (Sunday) appears from upstream, concatMap() subscribes to an Observable returned from loadRecordsFor() and passes all events emitted from it downstream. When this inner stream completes, concatMap() waits for the next upstream event (Monday) and continues. concatMap() does not introduce any concurrency whatsoever but it preserves the order of upstream events, avoiding overlapping.

CONTROLLING THE CONCURRENCY OF FLATMAP()

Suppose that you have a large list of users wrapped in an Observable. Each User has a loadProfile() method that returns an Observable<Profile> instance fetched using an HTTP request. Our aim is to load the profiles of all users as fast as possible. flatMap() was designed exactly for that: to allow spawning concurrent computation for each upstream value:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class User {
Observable<Profile> loadProfile() {
//Make HTTP request...
}
}
class Profile {/* ... */}
//...
List<User> veryLargeList = //...
Observable<Profile> profiles = Observable
.from(veryLargeList)
.flatMap(User::loadProfile);

At first sight it looks great. Observable<User> is constructed from a fixed List using the from() operator; thus, when subscribed it emits all users pretty much instantaneously. For every new User flatMap() calls, loadProfile() returns Observable<Profile>.

Then, flatMap() transparently subscribes to every new Observable<Profile>, redirecting all Profile events downstream. Subscription to inner Observable<Profile> most likely makes a new HTTP connection. Therefore, if we have, say 10,000 Users, we suddenly triggered 10,000 concurrent HTTP connections. If all of them hit the same server, we can expect any of the following:

  • Rejected connections
  • Long wait time and timeouts
  • Crashing the server
  • Hitting rate-limit or blacklisting
  • Overall latency increase
  • Issues on the client, including too many open sockets, threads, excessive memory usage

Increasing concurrency pays off only up to certain point. If you try to run too many operations concurrently, you will most likely end up with a lot of context switches, high memory and CPU utilization, and overall performance degradation.

One solution could be to slow down Observable<User> somehow so that it does not emit all Users at once. However, tuning that delay to achieve optimal concurrency level is troublesome. Instead flatMap() has a very simple overloaded version that limits the total number of concurrent subscriptions to inner streams:

1
flatMap(User::loadProfile, 10);

The maxConcurrent parameter limits the number of ongoing inner Observables. In practice when flatMap() receives the first 10 Users it invokes loadProfile() for each of them. However, when the 11th User appears from upstream, flatMap() will not even call loadProfile(). Instead, it will wait for any ongoing inner streams to complete. Therefore, the maxConcurrent parameter limits the number of background tasks that are forked from flatMap().

You can probably see that concatMap(f) is semantically equivalent to flatMap(f, 1)flatMap() with maxConcurrent equal to one. We could spend a couple of extra pages discussing the nuances of flatMap(), but more exciting operators lie ahead of us.

More Than One Observable

Transforming a single Observable is interesting, but what if there are more Observables that need to cooperate? If you come from traditional concurrent programing in Java, full of Threads and Executors, you know how difficult shared mutable state and synchronization is. Fortunately, RxJava works even better in such circumstances.

Also the library has a consistent way of handling errors in all operators involving multiple streams. If any of the upstream sources emits an error notification, it will be forwarded downstream and complete the downstream sequence with an error, as well.

If more than one upstream Observable emits an error, the first one wins and the others are discarded (any Observable can emit onError only once). Finally, if you want to continue processing and emit errors only when all normal events were produced, many operators have a DelayError variant.

Treating Several Observables as One Using merge()

Imagine that we have three algorithms that are already RxJava-enabled, each one nicely encapsulated within Observable. Of course, each algorithm alone can produce zero to possibly an infinite number of results:

1
2
3
4
5
6
7
8
9
10
11
Observable<LicensePlate> fastAlgo(CarPhoto photo) {
//Fast but poor quality
}
Observable<LicensePlate> preciseAlgo(CarPhoto photo) {
//Precise but can be expensive
}
Observable<LicensePlate> experimentalAlgo(CarPhoto photo) {
//Unpredictable, running anyway
}

What we would like to do is run these three algorithms side by side and receive results as soon as possible. We do not care which algorithm emitted an event, we want to catch all of them and aggregate into a single stream. This is what the merge() operator does:

1
2
3
4
5
Observable<LicensePlate> all = Observable.merge(
preciseAlgo(photo),
fastAlgo(photo),
experimentalAlgo(photo)
);

I intentionally placed preciseAlgo() (presumably slowest) first to emphasize that the order of Observables passed to merge() is rather arbitrary. The merge() operator will keep a reference to all of the underlying Observables, and as soon as someone subscribes to Observable<LicensePlate> all, it will automatically subscribe to all upstream Observables at once.

Of course, the merge() operator follows the Rx contract, ensuring that events are serialized (do not overlap), even if underlying streams each emit a value at the same time. The following marble diagram illustrates how merge() works:

Merge Marble Diagram

The merge() operator is used extensively when you want to treat multiple sources of events of the same type as a single source. Also, if you have just two Observables you want to merge(), you can use obs1.mergeWith(obs2) instance method.

Keep in mind that errors appearing in any of the underlying Observables will be eagerly propagated to Observers. You can use the mergeDelayError() variant of merge() to postpone any errors until all of the other streams have finished. mergeDelayError() will even make sure to collect all exceptions, not only the first one, and encapsulate them in rx.exceptions.CompositeException.

Pairwise Composing Using zip() and zipWith()

Zipping is the act of taking two (or more) streams and combining them with each other in such a way that each element from one stream is paired with corresponding event from the other. A downstream event is produced by composing the first event from each, second event from each stream, and so on.

Therefore, events appear only when all upstream sources emit an event. This is useful when you want to combine results from multiple streams that are somehow related to one another. Or, quite the contrary, when two independent streams emit values but only combining them together has business meaning. The following marble diagram illustrates how this works:

Zip Marble Diagram

The zip() and zipWith() operators are equivalent. We use the former when we want to fluently compose one stream with another, like so: s1.zipWith(s2, ...). But when we have more than two streams to compose, static zip() on Observable can take up to nine streams:

1
Observable.zip(s1, s2, s3...)

To understand zip(), imagine that you have two independent streams, yet those streams are entirely synchronized with each other. For example, think about the WeatherStation API that exposes temperature and wind measurements precisely every minute at the same time:

1
2
3
4
interface WeatherStation {
Observable<Temperature> temperature();
Observable<Wind> wind();
}

We have to make an assumption that events from these two Observables are emitted at the same time and thus with the same frequency. Under this restriction, we can safely join these two streams by combining every pair of events. This means that when an event occurs on one stream, we must hold it until the other appears, and vice versa.

The name zip implies that there are two flows of events that we join together, one from left, one from right, repeat. But in a more general version, zip() can take up to nine upstream Observables and emit an event only when all of them emit an event.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Weather {
public Weather(Temperature temperature, Wind wind) {
//...
}
}
//...
Observable<Temperature> temperatureMeasurements = station.temperature();
Observable<Wind> windMeasurements = station.wind();
temperatureMeasurements
.zipWith(windMeasurements,
(temperature, wind) -> new Weather(temperature, wind));

When a new Temperature event occurs, zipWith() waits (obviously without blocking!) for Wind, and vice versa. Two events are passed to our custom lambda and combined into a Weather object. Then, the cycle repeats. zip() was described in terms of streams, even infinite ones.

However, often you will find yourself using zipWith() and zip() for Observables that emit exactly one item. Such an Observable is typically an asynchronous response to some request or action.

Suppose that you would like to plan a one-day vacation in some city when the weather is sunny and airfare and hotels are cheap. To do so, we will combine several streams together and come up with all possible results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.time.LocalDate;
Observable<LocalDate> nextTenDays =
Observable
.range(1, 10)
.map(i -> LocalDate.now().plusDays(i));
Observable<Vacation> possibleVacations = Observable
.just(City.Warsaw, City.London, City.Paris)
.flatMap(city -> nextTenDays.map(date -> new Vacation(city, date)))
.flatMap(vacation ->
Observable.zip(
vacation.weather().filter(Weather::isSunny),
vacation.cheapFlightFrom(City.NewYork),
vacation.cheapHotel(),
(w, f, h) -> vacation
));

Vacation class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Vacation {
private final City where;
private final LocalDate when;
Vacation(City where, LocalDate when) {
this.where = where;
this.when = when;
}
public Observable<Weather> weather() {
//...
}
public Observable<Flight> cheapFlightFrom(City from) {
//...
}
public Observable<Hotel> cheapHotel() {
//...
}
}

Quite a lot is happening in the preceding code. First, we generate all dates from tomorrow to 10 days ahead using a combination of range() and map(). Then, we flatMap() these days with three cities—we do not want to use zip() here, because we need all possible combinations of date versus city pairs. For each such pair, we create an instance of Vacation class encapsulating it.

Now the real logic: we zip together three Observables: Observable<Weather>, Observable<Flight>, and Observable<Hotel>. The last two are supposed to return a zero or one result depending on whether cheap flight or hotel was found for that city/date.

Even though Observable<Weather> always returns something, however, we use filter(Weather::sunny) to discard nonsunny weather. So we end up with zip() operation of three streams, each emitting zero to one items.

zip() completes early if any of the upstream Observables complete, discarding other streams early: thanks to this property, if any of weather, flight, or hotel is absent, the result of zip() completes with no items being emitted, as well. This leaves us with a stream of all possible vacation plans matching requirements.

Do not be surprised to see a zip function that does not take arguments into account: (w, f, h) -> vacation. An outer stream of Vacation lists all possible vacation plans for every possible day. However, for each vacation, we want to make sure weather, cheap flight, and hotel are present. If all these conditions are met, we return vacation instance; otherwise, zip will not invoke our lambda expression at all.

When Streams Are Not Synchronized with One Another: combineLatest(), withLatestFrom(), and amb()

If one of the streams outperforms the other even slightly, events from the faster Observable will need to wait longer and longer for the lagging stream. To illustrate this effect, let’s first zip() two streams that are producing items at the exact same pace:

1
2
3
4
5
6
7
8
Observable<Long> red = Observable.interval(10, TimeUnit.MILLISECONDS);
Observable<Long> green = Observable.interval(10, TimeUnit.MILLISECONDS);
Observable.zip(
red.timestamp(),
green.timestamp(),
(r, g) -> r.getTimestampMillis() - g.getTimestampMillis()
).forEach(System.out::println);

The timestamp() operator wraps whatever the event type T was with rx.schedulers.Timestamped<T> class having two attributes: original value of type T and long timestamp when it was created.

In zip() transformation, we simply compare the time difference between creation of events in each stream. When streams are synchronized, this value oscillates around zero. However, if we slightly slow down one Observable, say green becomes Observable.interval(11, MILLISECONDS), the situation is much different. The time difference between red and green keeps going up: red is consumed in real time but it must wait, increasing the amount of time for the slower item. Over time this difference piles up and can lead to stale data or even memory leak. In practice zip() must be used carefully.

What we actually expect is emitting a pair every time any upstream produces an event, using the latest known value from the other stream. This is where combineLatest() becomes useful, as illustrated by the following marble diagram:

Combine Latest Zip Marble Diagram

Take the following artificial example. One stream produces S0, S1, S2 values every 17 milliseconds whereas the other F0, F1, F2 every 10 milliseconds (considerably faster):

1
2
3
4
5
6
7
8
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static rx.Observable.interval;
Observable.combineLatest(
interval(17, MILLISECONDS).map(x -> "S" + x),
interval(10, MILLISECONDS).map(x -> "F" + x),
(s, f) -> f + ":" + s
).forEach(System.out::println);

We combine these two streams and produce a new value every time any of the streams produces something. The output quickly becomes out-of-sync, but at least values are consumed in real time, and the faster stream does not need to wait for the slower one:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
F0:S0
F1:S0
F2:S0
F2:S1
F3:S1
F4:S1
F4:S2
F5:S2
F5:S3
...
F998:S586
F998:S587
F999:S587
F1000:S587
F1000:S588
F1001:S588

withLatestFrom() operator

combineLatest is symmetric, which means that it does not distinguish between the substreams it combines. Occasionally, however, you want to emit an event every time something appears in one stream with latest value from the second stream, but not vice versa.

In other words, events from the second stream do not trigger a downstream event; they are used only when first stream emits. You can achieve such behavior by using the new withLatestFrom() operator. Let’s illustrate it with the same slow and fast streams:

1
2
3
4
5
Observable<String> fast = interval(10, MILLISECONDS).map(x -> "F" + x);
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
.withLatestFrom(fast, (s, f) -> s + ":" + f)
.forEach(System.out::println);

In the prior example, the slow stream is primary, the resulting Observable will always emit an event when slow emits, providing fast emitted at least one element so far. Conversely, fast stream is just a helper used only when slow emits something.

The function passed as the second argument to withLatestFrom() will combine every new value from slow with the most recent value from fast. However, new values from fast are not propagated downstream; they are just updated internally when the new slow appears. The output of the preceding code snippet reveals that all slow events appear exactly once, whereas some fast events are dropped:

1
2
3
4
5
6
7
8
S0:F1
S1:F2
S2:F4
S3:F5
S4:F7
S5:F9
S6:F11
...

All slow events appearing before the first fast event are silently dropped because there is nothing with which to combine them. This is by design, but if you truly need to preserve all events from the primary stream, you must ensure that the other stream emits some dummy event as soon as possible.

For example, you can prepend said stream with some dummy event emitted immediately. The example that follows artificially slows the fast stream by pushing all events 100 milliseconds forward. Without a dummy event, we would lose a few slow events; however, by using the startWith() operator we create a new Observable that derives from fast. It starts with "FX" immediately and then continues with events from the original fast stream:

1
2
3
4
5
6
7
8
Observable<String> fast = interval(10, MILLISECONDS)
.map(x -> "F" + x)
.delay(100, MILLISECONDS)
.startWith("FX");
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
.withLatestFrom(fast, (s, f) -> s + ":" + f)
.forEach(System.out::println);

The output reveals that no slow events are dropped. However, in the beginning we see dummy "FX" events a few times, until the first "F0" shows up after 100 milliseconds:

1
2
3
4
5
6
7
8
9
10
11
S0:FX
S1:FX
S2:FX
S3:FX
S4:FX
S5:FX
S6:F1
S7:F3
S8:F4
S9:F6
...

startWith() basically returns a new Observable that, upon subscription, first emits some constant values (like "FX") followed by original Observable. For example, the following code block yields 0, 1 and 2, in that order:

1
2
3
4
Observable
.just(1, 2)
.startWith(0)
.subscribe(System.out::println);

amb() operator

The last tiny operator that can become useful is amb() (together with ambWith()), which subscribes to all upstream Observables it controls and waits for the very first item emitted. When one of the Observables emits the first event, amb() discards all other streams and just keep forwarding events from the first Observable that woke up, as shown in the following marble diagram:

Amb Marble Diagram

The sample that follows illustrates how amb() works with two streams. Pay attention to initialDelay parameter that controls which Observable starts emitting first:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable<String> stream(int initialDelay, int interval, String name) {
return Observable
.interval(initialDelay, interval, MILLISECONDS)
.map(x -> name + x)
.doOnSubscribe(() ->
log.info("Subscribe to " + name))
.doOnUnsubscribe(() ->
log.info("Unsubscribe from " + name));
}
//...
Observable.amb(
stream(100, 17, "S"),
stream(200, 10, "F")
).subscribe(log::info);

You can write an equivalent program using nonstatic ambWith(), but it is less readable because it hides the symmetry of amb(). It seems like we are applying the second stream on top of the first, whereas both of them should be treated equally:

1
2
3
stream(100, 17, "S")
.ambWith(stream(200, 10, "F"))
.subscribe(log::info);

No matter which version you prefer, they yield the same results. The slow stream produces events less frequently, but the first event appears after 100 milliseconds, whereas the fast stream begins after 200 milliseconds. What amb() does is first subscribe to both Observables, and when it encounters the first event in the slow stream, it immediately unsubscribes from the fast one and forwards events from only the slow one:

1
2
3
4
5
6
7
8
9
14:46:13.334: Subscribe to S
14:46:13.341: Subscribe to F
14:46:13.439: Unsubscribe from F
14:46:13.442: S0
14:46:13.456: S1
14:46:13.473: S2
14:46:13.490: S3
14:46:13.507: S4
14:46:13.525: S5

doOnSubscribe() and doOnUnsubscribe() callbacks are useful for debugging purposes. Notice how unsubscription from F occurs roughly 100 millisecond after subscription to S; this is the moment when first event from S Observable appeared. At this point, listening for events from F no longer makes any sense.

Advanced Operators: collect(), reduce(), scan(), distinct(), and groupBy()

Some operators allow more advanced transformations such as scanning through the sequence and aggregating some value along the way, like a running average. Some operators are even stateful, managing internal state while the sequence progresses. This is how distinct works, caching and discarding already visited values.

Scanning Through the Sequence with Scan and Reduce

All operators we explored so far operated on a per-event basis (e.g., filtering, mapping, or zipping). But sometimes you want to aggregate events to shrink the initial stream or simplify it. For example, consider an Observable<Long> that monitors progress of data transfer. Every time a chunk of data is sent, a single Long value appears telling, indicating the size of that chunk. This is a useful bit of information, but what we really want to know is how many bytes were transferred in total. A very bad idea is to use global state modified inside an operator:

1
2
3
4
5
6
7
import java.util.concurrent.atomic.LongAdder;
//BROKEN!
Observable<Long> progress = transferFile();
LongAdder total = new LongAdder();
progress.subscribe(total::add);

The preceding code can lead to very unpleasant concurrency bugs, just like any other shared state. Lambda expressions within operators can be executed from arbitrary threads so global state must be thread safe. We must also take laziness into account. RxJava tries to minimize global state and mutability as much as possible by providing composable operators. Modifying global state is tricky, even with Rx guarantees.

Moreover, we can no longer rely on Rx operators to further compose total–for example, by periodically updating user interface. Signaling when a transfer is completed is also more complex. What we really want is a way to incrementally accumulate sizes of data chunks and report the current total, every time a new chunk appears. This is what our hypothetical stream should look like:

1
2
3
4
5
6
7
8
9
10
Observable<Long> progress = // [10, 14, 12, 13, 14, 16]
Observable<Long> totalProgress = /* [10, 24, 36, 49, 63, 79]
10
10+14=24
24+12=36
36+13=49
49+14=63
63+16=79
*/

You can implement this relatively complex workflow easily by using the scan() operator:

1
2
Observable<Long> totalProgress = progress
.scan((total, chunk) -> total + chunk);

scan() takes two parameters: the last generated value (known as the accumulator) and current value from upstream Observable. In the first iteration, total is simply the first item from progress, whereas in the second iteration it becomes the result of scan() from the previous one.

scan() is like a bulldozer, going through the source (upstream) Observable and accumulating items. Overloaded version of scan() can provide an initial value (if it is different than simply the first element):

1
2
3
4
Observable<BigInteger> factorials = Observable
.range(2, 100)
.scan(BigInteger.ONE, (big, cur) ->
big.multiply(BigInteger.valueOf(cur)));

factorials will generate 1, 2, 6, 24, 120, 720…, and so forth. Notice that the upstream Observable starts from 2 but the downstream starts from 1, which was our initial value (BigInteger.ONE). The rule of thumb is that the type of resulting Observable is always the same as the type of accumulator. So, if you do not provide a custom initial value of accumulator, the type T returned from scan() will not change. Otherwise (like in our factorials example), the result is of type Observable<BigInteger> because BigInteger was the type of initial value.

Sometimes, we do not care about intermediate results, just the final one. For example, we want to calculate total bytes transferred, not intermediate progress. Or, we would like to accumulate all values in some mutable data structure, like ArrayList, adding one item at a time. The reduce() operator was designed precisely for that. One rather obvious caveat: if your sequence is infinite, scan() keeps emitting events for each upstream event, whereas reduce() will never emit any event.

Imagine that you have a source of CashTransfer objects with getAmount() method returning BigDecimal. We would like to calculate the total amount on all transfers. The following two transformations are equivalent. They iterate over all transfers and add up amounts, beginning at ZERO:

1
2
3
4
5
6
7
8
9
10
Observable<CashTransfer> transfers = //...;
Observable<BigDecimal> total1 = transfers
.reduce(BigDecimal.ZERO,
(totalSoFar, transfer) ->
totalSoFar.add(transfer.getAmount()));
Observable<BigDecimal> total2 = transfers
.map(CashTransfer::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);

Both transformations yield the same result, but the second one seems simpler, despite using two steps. This is another reason to prefer smaller, more composable transformations over a single big one. Also you can probably see that reduce() is basically scan() talking only to the last element. As a matter of fact, you can implement it as follows:

1
2
3
4
5
public <R> Observable<R> reduce(
R initialValue,
Func2<R, T, R> accumulator) {
return scan(initialValue, accumulator).takeLast(1);
}

Reduction with Mutable Accumulator: collect()

Now, let’s transform the finite stream of events of type T into a stream with just a single event of type List<T>. Of course, that event is emitted when upstream Observable<T> completes:

1
2
3
4
5
6
Observable<List<Integer>> all = Observable
.range(10, 20)
.reduce(new ArrayList<>(), (list, item) -> {
list.add(item);
return list;
});

This example of reduce() simply begins with empty ArrayList<Integer> (an accumulator) and adds every emitted item to that ArrayList. The lambda expression responsible for reduction (accumulating) must return a new version of accumulator.

Unfortunately, List.add() does not return said List; instead, it returns boolean. An explicit return statement is required. To overcome this verboseness, you can use the collect() operator. It works almost exactly like reduce() but assumes that we use the same mutable accumulator for every event as opposed to returning a new immutable accumulator every time:

1
2
3
Observable<List<Integer>> all = Observable
.range(10, 20)
.collect(ArrayList::new, List::add));

Another useful use case for collect() is aggregating all events into a StringBuilder. In that case, the accumulator is an empty StringBuilder and an operation appends one item to that builder:

1
2
3
4
5
6
Observable<String> str = Observable
.range(1, 10)
.collect(
StringBuilder::new,
(sb, x) -> sb.append(x).append(", "))
.map(StringBuilder::toString);

Just like every Observable operator, both reduce() and collect() are nonblocking, so the resulting List<Integer> containing all numbers emitted from Observable.range(10, 20) will appear when upstream signals completion; exceptions are propagated normally. Transforming Observable<T> into Observable<List<T>> is so common that a built-in toList() operator exists.

Asserting Observable Has Exactly One Item Using single()

By the way, some Observables by definition must emit exactly one value. For example, the preceding code snippet will always emit one List<Integer>, even an empty one. In such circumstances, it is worthwhile to apply a single() operator. It does not change the upstream Observable in any way; however, it makes sure it emits exactly one event. In case this assumption is wrong, you will receive an exception instead of an unexpected result.

Dropping Duplicates Using distinct() and distinctUntilChanged()

An infinite stream of simply random values can be really useful, typically when combined with other streams. The following Observable produces pseudo-random Integer values from 0 to 1,000 exclusive:

1
2
3
4
5
6
Observable<Integer> randomInts = Observable.create(subscriber -> {
Random random = new Random();
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(random.nextInt(1000));
}
});

Obviously, duplicates can occur, and take(1001) is guaranteed to have at least one. But what if we want to take a peek at smaller (say, 10) unique random values? The built-in distinct() operator automatically discards events from upstream Observable that already occurred, making sure only unique events are passed downstream:

1
2
3
Observable<Integer> uniqueRandomInts = randomInts
.distinct()
.take(10);

Every time a new value is emitted from the upstream Observable (randomInts), the distinct() operator internally makes sure such value did not occur before. The comparison happens by means of equals() and hashCode(), so ensure that you implement them according to Java guidelines (two equal objects must have the same hash code). Interestingly, take(1001) would eventually emit every single value from 0 to 999 in random order and never complete because there is no 1,001st unique int between 0 and 999.

In “Use Case: From Callback API to Observable Stream”, we looked at Observable<twitter4j.Status> that was emitting status updates generated on social media website Twitter. Every time any user posted a status update, new event was pushed from that Observable. The Status object contains several attributes, like getText(), getUser(), and so on. The distinct() operator makes no sense for Status events, given that duplicates are virtually impossible. But, what if we would like to see the text of only the very first update per each user (status.getUser().getId() returning long)? Obviously, we can extract that unique property and run distinct() on that:

1
2
3
4
5
Observable<Status> tweets = //...
Observable<Long> distinctUserIds = tweets
.map(status -> status.getUser().getId())
.distinct();

Unfortunately, by the time we get to execute distinct(), the original Status object is lost. What we really need is a way to extract a property of event used to determine uniqueness. Two events are considered equal (and the latter being discarded as a result) if that extracted property (known as key) was already seen:

1
2
Observable<Status> distinctUserIds = tweets
.distinct(status -> status.getUser().getId());

Whatever we return as key is compared using equals() and hashCode() to already seen keys. Be sure to remember that distinct() must keep in mind all events/keys seen so far for eternity.

In practice, distinctUntilChanged() is often more reasonable. In the case of distinctUntilChanged(), any given event is discarded only if the previous event was the same (by default using equals() for comparison). distinctUntilChanged() works best when we receive a steady stream of some measurements and we want to be notified only when the measured value actually changed.

1
2
3
4
Observable<Weather> measurements = //...
Observable<Weather> tempChanges = measurements
.distinctUntilChanged(Weather::getTemperature);

The preceding code snippet emits a Weather event only when the temperature changes (changes to Wind are not taken into account). Obviously, if we want to an emit event every time either Temperature or Wind changes, parameterless distinctUntilChanged() would work great, assuming that Weather implements equals(). The important difference between distinct() and distinctUntilChanged() is that the latter can produce duplicates but only if they were separated by a different value.

For example, the same temperature might occur every day, separated by colder and warmer measurements. Also distinctUntilChanged() must only remember the last seen value, as opposed to distinct(), which must keep track of all unique values since the beginning of the stream. This means that distinctUntilChanged() has a predictable, constant memory footprint, as opposed to distinct().

Slicing and Dicing Using skip(), takeWhile(), and Others

You are never obligated to read the stream fully, especially but not exclusively when dealing with hot infinite Observables. As a matter of fact, it is a common practice to slice Observable and consume just a small subset.

take(n) and skip(n)

The take(n) operator will truncate the source Observable prematurely after emitting only the first n events from upstream, unsubscribing afterward (or complete earlier if upstream did not have n items). skip(n) is the exact opposite; it discards the first n elements and begins emitting events from the upstream Observable beginning with event n+1. Both operators are quite liberal: negative numbers are treated as zero, exceeding the Observable size is not treated as a bug:

1
2
3
Observable.range(1, 5).take(3); // [1, 2, 3]
Observable.range(1, 5).skip(3); // [4, 5]
Observable.range(1, 5).skip(5); // []
takeLast(n) and skipLast(n)

Another self-descriptive pair of operators. takeLast(n) emits only the last n values from the stream before it completes. Internally, this operator must keep a buffer of the last n values and when it receives completion notification, it immediately emits the entire buffer.

It makes no sense to call takeLast() on an infinite stream because it will never emit anything—the stream never ends, so there are no last events. skipLast(n), on the other hand, emits all values from upstream Observable except the last n. Internally, skipLast() can emit the first value from upstream only when it received n+1 elements, second when it received n+2, and so on.

1
2
Observable.range(1, 5).takeLast(2); // [4, 5]
Observable.range(1, 5).skipLast(2); // [1, 2, 3]
first() and last()

The parameterless first() and last() operators can be implement via take(1).single() and takeLast(1).single() accordingly, which should pretty much describe their behavior. The extra single() operator ensures that the downstream Observable emits precisely one value or exception. Additionally, both first() and last() have overloaded versions that take predicates. Rather than returning the very first/last value they emit first/last value, matching a given condition.

takeFirst(predicate)

The takeFirst(predicate) operator can be expressed by filter(predicate).take(1). The only difference between this one and first(predicate) is that it will not break with NoSuchElementException in case of missing matching values.

takeUntil(predicate) and takeWhile(predicate)

takeUntil(predicate) and takeWhile(predicate) are closely related to each other. takeUntil() emits values from the source Observable but completes and unsubscribes after emitting the very first value matching predicate. takeWhile(), conversely, emits values as long as they match a given predicate. So the only difference is that takeUntil() will emit the first nonmatching value, whereas takeWhile() will not.

These operators are quite important because they provide a means of conditionally unsubscribing from an Observable based on the events being emitted. Otherwise, the operator would need to somehow interact with the Subscription instance, which is not available when the operator is invoked.

1
2
Observable.range(1, 5).takeUntil(x -> x == 3); // [1, 2, 3]
Observable.range(1, 5).takeWhile(x -> x != 3); // [1, 2]
elementAt(n)

Extracting a specific item by index is rather uncommon, but you can use the built-in elementAt(n) operator for that. It is quite strict, and it can result in an IndexOutOfBoundsException being emitted when upstream Observable is not long enough or the index is negative. Of course, it returns Observable<T> of the same type T as upstream.

…OrDefault() operators

Many operators in this section are strict and can result in exceptions being thrown—for example, first() when upstream Observable is empty. Under these circumstances many ...OrDefault operators were introduced to replace exceptions with a default value. All of them are rather self-explanatory: elementAtOrDefault(), firstOrDefault(), lastOrDefault(), and singleOrDefault().

count()

count() is an interesting operator that calculates how many events were emitted by upstream Observable. By the way, if you need to know how many items matching a given predicate that the upstream Observable emitted, filter(predicate).count() can do that idiomatically. Do not worry, all operators are lazy so this will work even for quite large streams. Obviously, count() never emits any value in case of infinite stream. You can implement count() easily by using reduce()):

1
2
3
Observable<Integer> size = Observable
.just('A', 'B', 'C', 'D')
.reduce(0, (sizeSoFar, ch) -> sizeSoFar + 1);
all(predicate), exists(predicate), and contains(value)

Sometimes, it is useful to ensure that all events from a given Observable match some predicate. The all(predicate) operator will emit true when upstream completes and all values matched the predicate. However, false will be emitted as soon as first nonconforming value is found. exists(predicate) is the exact opposite of all(); it emits true when the first matching value is found but false in case of upstream completing without any matching value found. Often, our predicate in exists() simply compares upstream values with some constants. In that case, you can use the contains() operator:

1
2
3
4
5
Observable<Integer> numbers = Observable.range(1, 5);
numbers.all(x -> x != 4); // [false]
numbers.exists(x -> x == 4); // [true]
numbers.contains(4); // [true]

Ways of Combining Streams: concat(), merge(), and switchOnNext()

concat() (and instance method concatWith()) allow joining together two Observables: when the first one completes, concat() subscribes to the second one. Importantly, concat() will subscribe to the second Observable if, and only if, the first one is completed.

concat() can even work with the same upstream Observable with different operators applied. For example if we would like to receive only the first few and last few items from a very long stream, we could use the following:

1
2
3
4
5
Observable<Data> veryLong = //...
final Observable<Data> ends = Observable.concat(
veryLong.take(5),
veryLong.takeLast(5)
);

Keep in mind that the preceding code example subscribes to veryLong twice, which might be undesirable. Another example of concat() is providing fallback value when first stream did not emit anything:

1
2
3
4
5
6
Observable<Car> fromCache = loadFromCache();
Observable<Car> fromDb = loadFromDb();
Observable<Car> found = Observable
.concat(fromCache, fromDb)
.first();

Observables are lazy, so neither loadFromCache() nor loadFromDb() actually load any data yet. loadFromCache() can complete without emitting any events when cache is empty, but loadFromDb() always emits one Car. concat() followed by first() will initially subscribe to fromCache and if that emits one item, concat() will not subscribe to fromDb. However, if fromCache is empty, concat() will continue with fromDb, subscribe to it, and load data from database.

The concat() operator is actually closely related to merge() and switchMap(). concat() works like concatenation on ordinary List<T>: first, it takes all items from the first stream and only when it completes, it begins consuming second stream. Of course, like all operators we met so far, concat() is nonblocking, it emits events only when the underlying stream emits something. Now, let’s compare concat() with merge() and switchOnNext() just being introduced.

Consider a group of people, each one having microphone. Every microphone is modeled as an Observable<String>, for which an event represents a single word. Obviously, events appear over time, as soon as they are spoken. To simulate this behavior we will construct a simple Observable for demonstration purposes, interesting on its own:

1
2
3
4
5
6
7
8
9
10
11
12
Observable<String> speak(String quote, long millisPerChar) {
String[] tokens = quote.replaceAll("[:,]", "").split(" ");
Observable<String> words = Observable.from(tokens);
Observable<Long> absoluteDelay = words
.map(String::length)
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);
return words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
}

We take an arbitrary text in String and split it to words, removing punctuation using a regular expression. Now, for each word we calculate how much it takes to say that word, simply by multiplying the word length by millisPerChar. Then, we would like to spread words over time, so that each word appears in the resulting stream after the delay calculated in the preceding example. Clearly, a simple from operator is not enough:

1
Observable<String> words = Observable.from(tokens);

We want words to appear with delay, based on the length of the previous word. The first naive approach simply delays each word, given its length:

1
2
3
words.flatMap(word -> Observable
.just(word)
.delay(word.length() * millisPerChar, MILLISECONDS));

This solution is incorrect. The Observable will first emit all one-letter words at the same time. Then, after a while, all two-letter words followed by all three-letter words. What we want is to have the first word appear immediately and then the second word after a delay, depending on the length of the first word.

This sounds terribly complex but turns out to be quite pleasant. First we create a helper stream from words that contains only relative delays induced by each word:

1
2
3
words
.map(String::length)
.map(len -> len * millisPerChar);

Assuming millisPerChar is 100 and words are Though this be madness, we first get the following stream: 600, 400, 200, 700. If we were to simply delay() each word by that duration, “be“ word would appear first and other words would be scrambled as well. What we really want is a cumulative sequence of absolute delays, like this: 600, 600 + 400 = 1,000; 1,000 + 200 = 1,200; 1,200 + 700 = 1,900. This is easy using the scan() operator:

1
2
3
4
Observable<Long> absoluteDelay = words
.map(String::length)
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);

Now when we have a sequence of words and a sequence of absolute delays for each one of them, we can zip these two streams. This is the kind of a situation in which zip() shines:

1
2
3
words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft()))

This makes a lot of sense because we know two streams have exact the same size and are entirely in sync with each other. Well…almost. We do not want the first word to be delayed at all. Instead, the length of the first word should influence the delay of the second word, the total length of the first and second word should influence the delay of the third word, and so on. You can achieve such a shift easily by simply prepending absoluteDelay with 0:

1
2
3
4
5
6
import org.apache.commons.lang3.tuple.Pair;
words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));

We construct a sequence of pair words—absolute delay of that word, making sure the first word is not delayed at all. These pairs might look as follows:

1
2
3
4
5
(Though, 0)
(this, 600)
(be, 1000)
(madness, 1200)
...

This is our speech time line, each word accompanied with its point in time. All we need to do is turn every pair into a one-element Observable shifted in time:

1
2
flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));

After so much preparation, we can finally see how concat(), merge(), and switchOnNext() differ. Suppose that three people were quoting Hamlet by William Shakespeare:

1
2
3
4
5
6
7
Observable<String> alice = speak(
"To be, or not to be: that is the question", 110);
Observable<String> bob = speak(
"Though this be madness, yet there is method in't", 90);
Observable<String> jane = speak(
"There are more things in Heaven and Earth, " +
"Horatio, than are dreamt of in your philosophy", 100);

As you can see, each person has a slightly different pace measured in millisPerChar. What happens if all people speak at the same time? RxJava can answer this question:

1
2
3
4
5
6
7
Observable
.merge(
alice.map(w -> "Alice: " + w),
bob.map(w -> "Bob: " + w),
jane.map(w -> "Jane: " + w)
)
.subscribe(System.out::println);

The output is very chaotic, words spoken by each person interleave with each other. All we hear is noise, and without prefixing each phrase, it would have been difficult to understand:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Alice: To
Bob: Though
Jane: There
Alice: be
Alice: or
Jane: are
Alice: not
Bob: this
Jane: more
Alice: to
Jane: things
Alice: be
Bob: be
Alice: that
Bob: madness
Jane: in
Alice: is
Jane: Heaven
Alice: the
Bob: yet
Alice: question
Jane: and
Bob: there
Jane: Earth
Bob: is
Jane: Horatio
Bob: method
Jane: than
Bob: in't
Jane: are
Jane: dreamt
Jane: of
Jane: in
Jane: your
Jane: philosophy

This is how merge() works: it subscribes to words of each person immediately and forwards them downstream, no matter which person is speaking. If two streams emit an event at more or less the same time, they are both forwarded right away. There is no buffering or halting events within this operator.

The situation is much different if we replace merge() with concat() operator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Alice: To
Alice: be
Alice: or
Alice: not
Alice: to
Alice: be
Alice: that
Alice: is
Alice: the
Alice: question
Bob: Though
Bob: this
Bob: be
Bob: madness
Bob: yet
Bob: there
Bob: is
Bob: method
Bob: in't
Jane: There
Jane: are
Jane: more
Jane: things
Jane: in
Jane: Heaven
Jane: and
Jane: Earth
Jane: Horatio
Jane: than
Jane: are
Jane: dreamt
Jane: of
Jane: in
Jane: your
Jane: philosophy

Now the order is perfect. concat(alice, bob, jane) first subscribes to alice and keeps forwarding events from that first Observable until it is exhausted and completed. Then, concat() switches to bob.

Think about hot and cold Observables for a while. In case of merge(), all events from all streams are forwarded because merge() subscribes eagerly to every stream. However, concat() subscribes just to the first stream, so in case of hot Observable, you might expect a different outcome. By the time the first Observable is completed, the second one might be sending an entirely different sequence of events. Keep in mind that concat() does not buffer second Observable until the first one completes; instead, it simply subscribes lazily.

switchOnNext() is an entirely different way of combining operators. Imagine that you have an Observable<Observable<T>> that is a stream of events for which each event is a stream on its own.

This situation actually makes sense, for example, if you have a set of mobile phones connecting and disconnecting to the network (outer stream). Each new connection is an event, but every such event is a stream of independent heartbeat messages (Observable<Ping>).

In our case, we will have an Observable<Observable<String>>, where each inner stream is a quote from a different person: alice, bob, or jane:

1
2
3
4
5
6
7
import java.util.Random;
Random rnd = new Random();
Observable<Observable<String>> quotes = just(
alice.map(w -> "Alice: " + w),
bob.map(w -> "Bob: " + w),
jane.map(w -> "Jane: " + w));

First, we wrap alice, bob and jane Observables into an Observable<Observable<String>>. Let us reiterate: quotes Observable emits three events immediately, each event being an inner Observable<String>. Every inner Observable<String> represents words spoken by each person.

To illustrate how switchOnNext() works, we shall delay the emission of inner Observables. We are not delaying each word within that Observable (variant A) but the entire Observable (variant B is subtly different):

1
2
3
4
5
6
7
//A
map(innerObs ->
innerObs.delay(rnd.nextInt(5), SECONDS))
//B
flatMap(innerObs -> just(innerObs)
.delay(rnd.nextInt(5), SECONDS))

In variant A, the Observable appears immediately in the outer stream but begins emitting events with some delay. In variant B, on the other hand, we shift the entire Observable event forward in time so that it appears in the outer Observable much later.

Now the reason why we needed such a complex setup. Both static concat() and merge() operators can work with either a fixed list of Observables or Observable of Observables. In the case of switchOnNext(), the ladder makes sense.

switchOnNext() begins by subscribing to an outer Observable<Observable<T>>, which emits inner Observable<T>s. As soon as the first inner Observable<T> appears, this operator subscribes to it and begins pushing events of type T downstream. Now what happens if next inner Observable<T> appears? switchOnNext() discards the first Observable<T> by unsubscribing from it and switches to the next one (thus, the name). In other words, when we have a stream of streams, switchOnNext() always forwards downstream events from the last inner stream, even if older streams keep forwarding fresh events.

This is how it looks in our Hamlet quoting example:

1
2
3
4
5
6
7
8
9
10
11
Random rnd = new Random();
Observable<Observable<String>> quotes = just(
alice.map(w -> "Alice: " + w),
bob.map(w -> "Bob: " + w),
jane.map(w -> "Jane: " + w))
.flatMap(innerObs -> just(innerObs)
.delay(rnd.nextInt(5), SECONDS));
Observable
.switchOnNext(quotes)
.subscribe(System.out::println);

One of the possible outcomes, due to the random nature of this example, could look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Jane: There
Jane: are
Jane: more
Alice: To
Alice: be
Alice: or
Alice: not
Alice: to
Bob: Though
Bob: this
Bob: be
Bob: madness
Bob: yet
Bob: there
Bob: is
Bob: method
Bob: in't

Each person starts speaking with zero to four seconds random delay. In this particular round, it was Jane’s Observable<String>, but after citing few words, Alice’s Observable<String> appeared in the outer Observable. At this point switchOnNext() unsubscribes from jane, and we never hear the rest of this quote. This Observable is discarded and ignored, switchOnNext() only listens to alice at the moment. However, again the inner Observable is interrupted because Bob’s quote appears. Theoretically, switchOnNext() could produce all of the events from the inner Observables if they did not overlap, completing before the next one appears.

Now what would happen in the case of delaying only events in every inner Observable (variant A, remember?) rather than delaying Observables themselves? Well, three inner Observables would appear at the same time in outer Observable, and switchOnNext() would only subscribe to one of them.

Criteria-Based Splitting of Stream Using groupBy()

One of the techniques often used together with domain-driven design (for more information about data-driven design is event sourcing. In this architecture style, data is not stored as a snapshot of current state and mutated in place; that is, using SQL UPDATE queries.

Instead, a sequence of immutable domain events (facts) about events that already happened are kept in an append-only data store. Using this design, we never overwrite any data, effectively having an audit log for free. Moreover, the only way to see the data in real time is by applying these facts one after another, starting from an empty view.

The process of applying events on top of an initial empty state is known as projection in event sourcing. A single source of facts can drive multiple different projections. For example, we might have a stream of facts related to a reservation system, like TicketReserved, ReservationConfirmed, and TicketBought—the past tense is important because facts always reflect actions and events that already occurred. From a single stream of facts (also being the single source of truth), we can derive multiple projections, such as the following:

  • List of all confirmed reservations
  • List of reservations canceled today
  • Total revenue per week

When the system evolves, we can discard old projections and build new ones, taking advantage of data collected eagerly in facts. Suppose that you would like to build a projection containing all reservations together with their status.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
FactStore factStore = new CassandraFactStore();
Observable<ReservationEvent> facts = factStore.observe();
facts.subscribe(this::updateProjection);
//...
void updateProjection(ReservationEvent event) {
UUID uuid = event.getReservationUuid();
Reservation res = loadBy(uuid)
.orElseGet(() -> new Reservation(uuid));
res.consume(event);
store(event.getUuid(), res);
}
private void store(UUID id, Reservation modified) {
//...
}
Optional<Reservation> loadBy(UUID uuid) {
//...
}
class Reservation {
Reservation consume(ReservationEvent event) {
//mutate myself
return this;
}
}

Obviously, the stream of facts is expressed as Observable. Some other part of the system receives API calls or web requests, reacts (e.g., charges the customer’s credit card) and stores facts (domain events) about what happened. Other parts of the system (or even other systems!) can consume these facts by subscribing to a stream and building a snapshot of current system state from some arbitrary perspective.

Our code is quite simple: each ReservationEvent loads a Reservation from our projection’s data store. If Reservation was not found, it means that it was the very first event associated with this UUID, so we begin with an empty Reservation. Then, we pass ReservationEvent to Reservation object. It can update itself to reflect any type of fact. Then, we store Reservation back.

This brief introduction to event sourcing will help us to understand why groupBy() operator is useful. After a while, we discovered that updates to Reservation projection fall behind, we cannot keep up with the rate of facts being generated. The data store can easily handle concurrent reads and updates, so we can try to parallelize handling of facts:

1
2
3
4
5
6
7
8
9
10
Observable<ReservationEvent> facts = factStore.observe();
facts
.flatMap(this::updateProjectionAsync)
.subscribe();
//...
Observable<ReservationEvent> updateProjectionAsync(ReservationEvent event) {
//possibly asynchronous
}

In this case, we consume facts in parallel, or to be more precise: receiving is sequential but handling (in updateProjectionAsync()) is possibly asynchronous. updateProjectionAsync() alters the state of supplied Reservation objects inside a projection. But a look at how updateProjection() was implemented we quickly see a possible race-condition: two threads can consume different events, modify the same Reservation and try to store it—but the first update is overwritten and effectively lost.

This is where groupBy() comes in handy. It splits a stream based on some key into multiple parallel streams, each holding events with given key. In this case, we want to split one huge stream of all facts regarding reservations into large number of smaller streams, each emitting only events related to a specific UUID:

1
2
3
4
5
6
7
8
Observable<ReservationEvent> facts = factStore.observe();
Observable<GroupedObservable<UUID, ReservationEvent>> grouped =
facts.groupBy(ReservationEvent::getReservationUuid);
grouped.subscribe(byUuid -> {
byUuid.subscribe(this::updateProjection);
});

This example contains quite a few new constructs. First, we take upstream Observable<ReservationEvent> stream and group it by UUID (ReservationEvent::getReservationUuid).

You might first expect that groupBy() should return a List<Observable<ReservationEvent>>—after all we transform a single stream into multiple ones. This assumption breaks when you realize that groupBy() cannot possibly know how many different keys (UUIDs) will generate upstream.

Therefore, it must produce them on-the-fly: whenever a new UUID is discovered, the new GroupedObservable<UUID, ReservationEvent> is emitted, pushing events related to that UUID. So it becomes clear that the outer data structure must be an Observable.

But what is this GroupedObservable<UUID, ReservationEvent> anyway? GroupedObservable is a simple subclass of Observable that apart from the standard Observable contract returns a key to which all events in that stream belong (UUID, in our case).

The number of emitted GroupedObservables can be anything from one (in case of all events having the same key) to the total number of events (if each upstream event has a unique key). This is one of these cases for which nested Observables are not that bad. When we subscribe to the outer Observable, every emitted value is actually another Observable (GroupedObservable) to which you can subscribe. For example, each inner stream can provide events related to one another (like the same correlation ID), however, inner streams are unrelated to one another and can be processed separately.

Where to Go from Here?

There are dozens of other operators built in to RxJava. Many of them will be explained in Chapter 6, but going through the entire API is not very reasonable and quite time consuming. Also such an exhaustive description would become obsolete from version to version. However, you should have a basic understanding of what operators can do for you and how they work. The next logical step is writing custom operators.

Writing Customer Operators

We barely scratched the surface of available operators in RxJava, and you will learn many more throughout the course of this book. Moreover, the true power of operators comes from their composition. Following the UNIX philosophy of “small, sharp tools” each operator is doing one, small transformation at a time.

This section will first guide you through the compose() operator, which allows fluent composition of smaller operators, and later introduces the lift() operator, which helps you to write entirely new custom operators.

Reusing Operators Using compose()

Let’s begin by looking at an example. For some reason, we want to transform an upstream Observable so that every other item is discarded and we only receive even items. In “Flow Control”, we will learn about the buffer() operator that makes this task very simple (buffer(1, 2) does almost exactly what we want). However, we will pretend that we do not know this operator so far, but we can implement this functionality easily by composing several operators:

1
2
3
4
5
6
7
8
9
10
import org.apache.commons.lang3.tuple.Pair;
//...
Observable<Boolean> trueFalse = Observable.just(true, false).repeat();
Observable<T> upstream = //...
Observable<T> downstream = upstream
.zipWith(trueFalse, Pair::of)
.filter(Pair::getRight)
.map(Pair::getLeft);

First, we generate an infinite Observable<Boolean> emitting true and false alternately. We can implement this easily by creating a fixed [true, false] stream with just two items and then repeating it infinitely with the repeat() operator. repeat() simply intercepts completion notification from upstream and rather than passing it downstream it resubscribes. Therefore, it is not guaranteed that repeat() will keep cycling through the same sequence of events, but it happens to be the case when upstream is a simple fixed stream.

We zipWith() our upstream Observable with this infinite stream of true and false. However, zipping requires a function that combines two items. This is simpler in other languages; in Java, we help ourselves by using the Apache Commons Lang library, which provides a simple Pair class. If you want to avoid third-party library, an alternative implementation follows:

1
2
3
4
5
6
7
8
import static rx.Observable.empty;
import static rx.Observable.just;
//...
upstream.zipWith(trueFalse, (t, bool) ->
bool ? just(t) : empty())
.flatMap(obs -> obs)

At first glance, flatMap() looks kind of odd and doesn’t appear to be doing anything that is actually crucial. From zipWith() transformation we return an Observable (one element or empty), which leads to Observable<Observable<T>>. By using flatMap() this way we get rid of this nesting level—after all, a lambda expression in flatMap() is supposed to return an Observable for each input element, which also happens to be an Observable.

No matter which implementation you choose, this is not very reusable. If you need to reuse “every odd element” sequence of operators, you either copy-paste them or create a utility method like this:

1
2
3
4
5
6
7
static <T> Observable<T> odd(Observable<T> upstream) {
Observable<Boolean> trueFalse = just(true, false).repeat();
return upstream
.zipWith(trueFalse, Pair::of)
.filter(Pair::getRight)
.map(Pair::getLeft)
}

But you can no longer fluently chain operators; in other words, you cannot say: obs.op1().odd().op2(). Unlike C# (where reactive extensions originated) and Scala (via implicits), Java does not allow extension methods. But the built-in compose() operator comes as close as possible. compose() takes a function as an argument that is supposed to transform the upstream Observable via a series of other operators. This is how it works in practice:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private <T> Observable.Transformer<T, T> odd() {
Observable<Boolean> trueFalse = just(true, false).repeat();
return upstream -> upstream
.zipWith(trueFalse, Pair::of)
.filter(Pair::getRight)
.map(Pair::getLeft);
}
//...
//[A, B, C, D, E...]
Observable<Character> alphabet =
Observable
.range(0, 'Z' - 'A' + 1)
.map(c -> (char) ('A' + c));
//[A, C, E, G, I...]
alphabet
.compose(odd())
.forEach(System.out::println);

The odd() function returns a Transformer<T, T> from Observable<T> to Observable<T> (of course, types can be different). Thus, Transformer is a function on its own, so we can replace it with a lambda expression (upstream -> upstream...).

Notice that the odd() function is executed eagerly when Observable is assembled, not during subscription. Interestingly, if you want to emit even values (2nd, 4th, 6th, etc.) rather than odd (1st, 3rd, 5th, etc.), simply replace trueFalse with trueFalse.skip(1).

Implementing Advanced Operators Using lift()

Implementing custom operators is tricky because backpressure and the subscription mechanism need to be taken into account. Therefore, try your best to implement your requirements from existing operators rather than inventing your own. Built-in operators are much better tested and proven.

However, if none of the supplied operators work for you, the lift() meta-operator will help. compose() is only useful for grouping existing operators together. With lift(), on the other hand, you can implement almost any operator, altering the flow of upstream events.

Whereas compose() transforms Observables, lift() allows transforming Subscribers. When you subscribe() to an Observable, the Subscriber instance wrapping your callback travels up to the Observable it subscribed to and causes Obsevable’s create() method to be invoked with our subscriber as an argument (gross simplification). So every time we subscribe, a Subscriber travels up through all operators to the original Observable. Obviously, between the Observable and subscribe() there can be an arbitrary number of operators, altering events flowing downstream, as illustrated here:

1
2
3
4
5
6
7
Observable
.range(1, 1000)
.filter(x -> x % 3 == 0)
.distinct()
.reduce((a, x) -> a + x)
.map(Integer::toHexString)
.subscribe(System.out::println);

But here is an interesting fact: if you look up the source code of RxJava and replace operator invocations with their body, this quite complex sequence of operators becomes very regular (notice how reduce() is implemented using scan().takeLast(1).single():

1
2
3
4
5
6
7
8
9
Observable
.range(1, 1000)
.lift(new OperatorFilter<>(x -> x % 3 == 0))
.lift( OperatorDistinct.<Integer>instance())
.lift(new OperatorScan<>((Integer a, Integer x) -> a + x))
.lift( OperatorTakeLastOne.<Integer>instance())
.lift( OperatorSingle.<Integer>instance())
.lift(new OperatorMap<>(Integer::toHexString))
.subscribe(System.out::println);

Almost all operators, excluding those working with multiple streams at once (like flatMap()) are implemented by means of lift(). When we subscribe() at the very bottom, a Subscriber<String> instance is created and passed to the immediate predecessor. It can be “true” Observable<String> that emits events or just the result of some operator, map(Integer::toHexString) in our case.

map() itself does not emit events, yet it received a Subscriber that wants to receive them. What map() does (through the lift() helper operator) is it transparently subscribes to its parent (reduce() in the preceding example). However, it cannot pass the same Subscriber instance it received. This is because subscribe() required Subscriber<String>, whereas reduce() expects Subscriber<Integer>.

After all, that is what map() is doing here: transforming Integer to String. So instead, map() operator creates a new artificial Subscriber<Integer> and every time this special Subscriber receives anything, it applies Integer::toHexString function and notifies the downstream Subscriber<String>.

Looking under the hood of the map() operator

This is essentially what OperatorMap class is doing: providing a transformation from downstream (child) Subscriber<R> into upstream Subscriber<T>. Here is the real implementation found in RxJava, with some minor readability simplifications:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<T, R> transformer;
public OperatorMap(Func1<T, R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<T> call(final Subscriber<R> child) {
return new Subscriber<T>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
try {
child.onNext(transformer.call(t));
} catch (Exception e) {
onError(e);
}
}
};
}
}

One unusual detail is the reversed order of T and R generic types. The map() operator transforms values flowing from upstream of type T to type R. However, the operator’s responsibility is transforming Subscriber<R> (coming from downstream subscription) to Subscriber<T> (passed to upstream Observable). We expect subscribe via Subscriber<R>, whereas operator map() is used against Observable<T>, requiring Subscriber<T>.

Keep in mind that until someone actually subscribes, we barely created a new Observable (lift(), like any other operator, creates new Observable) with a reference to OperatorMap instance underneath, which in turns holds a reference to our function. But only when someone actually subscribes, the call() function of OperatorMap is invoked. This function receives our Subscriber<String> (e.g., wrapping System.out::println) and returns another Subscriber<Integer>. It is the latter Subscriber that travels upstream, to preceding operators.

That is pretty much how all operators work, both built-in and custom. You receive a Subscriber and return another one, enhancing and passing whatever it wishes to downstream Subscriber.

Our first operator

This time we would like to implement an operator that will emit toString() of every odd (1st, 3rd, 5th, etc.) element. It is best explained with some sample code:

1
2
3
4
Observable<String> odd = Observable
.range(1, 9)
.lift(toStringOfOdd())
// Will emit: "1", "3", "5", "7" and "9" strings

You can achieve the same functionality by using built-in operators, we are writing a custom operator just for educational purposes:

1
2
3
4
5
Observable
.range(1, 9)
.buffer(1, 2)
.concatMapIterable(x -> x)
.map(Object::toString);

The buffer() will be introduced in “Buffering Events to a List”, for the time being, all you need to know is that buffer(1, 2) will transform any Observable<T> into Observable<List<T>>, where each inner List has exactly one odd element and skips even ones. Having a stream of lists like List(1), List(3), and so on, we reconstruct a flat stream using concatMapIterable(). But for the sake of learning experience, let’s implement a custom operator that does that in a single step. The custom operator can be in one of two states:

  • It either received odd event (1st, 3rd, 5th, etc.) from upstream which it forwards downstream after applying it to toString()
  • It received even event which it simply discards

Then cycle repeats. The operator might look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<T> Observable.Operator<String, T> toStringOfOdd() {
return new Observable.Operator<String, T>() {
private boolean odd = true;
@Override
public Subscriber<? super T> call(Subscriber<? super String> child) {
return new Subscriber<T>(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
if(odd) {
child.onNext(t.toString());
} else {
request(1);
}
odd = !odd;
}
};
}
};
}

The request(1) invocation will be explained much later in “Honoring the Requested Amount of Data”. For now you can understand it like this: when a Subscriber requests just a subset of events—for example, only the first two (take(2))—RxJava takes care of requesting only that amount of data by calling request(2) internally.

This request is passed upstream and we receive barely 1 and 2. However, we drop 2 (even), yet we were obligated to provide two events downstream. Therefore, we must request one extra event (request(1)) in addition to that so that we receive 3, as well. RxJava implements quite a sophisticated mechanism called backpressure that allows subscribers to request only the amount of events they can process, protecting from producers outperforming consumers.

Unfortunately, for better or worse, null is a valid event value in RxJava; that is, Observable.just("A", null, "B") is as good as any other stream. You need to take that into account when designing custom operators as well as when applying operators. However, passing null is generally considered nonidiomatic, and you should use wrapper value types, instead.

Another interesting pitfall you might encounter is failing to provide a child Subscriber as an argument to the new Subscriber, like here:

1
2
3
4
5
6
<T> Observable.Operator<String, T> toStringOfOdd() {
//BROKEN
return child -> new Subscriber<T>() {
//...
}
}

The parameterless constructor of Subscriber is fine, and again our operator seems to work. But let’s see how it goes with infinite stream:

1
2
3
4
5
6
7
8
9
10
Observable
.range(1, 4)
.repeat()
.lift(toStringOfOdd())
.take(3)
.subscribe(
System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Completed")
);

We build an infinite stream of numbers (1, 2, 3, 4, 1, 2, 3…), apply our operator (“1”, “3”, “1”, “3”…), and take only the first three values. This is absolutely fine and should never fail; after all, streams are lazy. But remove child from new Subscriber(child) constructor and our Observable never notifies about completion after receiving 1, 3, 1. What happened?

The take(3) operator requested only the first three values and wanted to unsubscribe(). Unfortunately, the unsubscription requested never made it to the original stream, which keeps producing values. Even worse, these values are processed by our custom operator and passed to downstream Subscriber (take(3)), which is not even listening anymore.

Implementation details aside, as a rule of thumb, pass the downstream Subscriber as a constructor argument to the new Subscriber when writing your own operators. A no-argument constructor is used rarely and very unlikely you will need it for simple operators.

This is just the tip of the iceberg with respect to the issues you can encounter when writing your own operators. Luckily, very seldom are we not able to achieve what we want to accomplish with built-in mechanisms.

Summary

The true power of RxJava lies in its operators. Declarative transformations of streams of data is safe yet expressive and flexible. With a strong foundation in functional programming, operators play deciding role in RxJava adoption.

Mastering built-in operators is a key to success in this library. But remember we did not see all operators yet—for example, see “Flow Control”. But at this point, you should have a good overview of what RxJava can do and how to enhance it when it cannot do something directly.

(To Be Continued)