Reactive Programming with RxJava: Reactive Extensions

Reactive Extensions

This chapter will guide you through the core concepts related to Reactive Extensions and RxJava. You will become very comfortable with Observable<T>, Observer<T>, and Subscriber<T> and a few helpful utility methods called operators. Observable is the core API RxJava, so make sure you understand how it works and what it represents.

Throughout this chapter, you will learn what Observable really is and how to create it and interact with it. The knowledge you gain is essential to idiomatically provide and consume reactive APIs based on RxJava. RxJava was designed to ease the pain of asynchronous and event-driven programming, but you must understand some core principles and semantics in order to take advantage of that.

Anatomy of rx.Observable

rx.Observable<T> represent a flowing sequence of values. It is the abstraction that you will use all of the time. Because these values often appear over a wide time range, we tend to think about an Observable as a stream of events. If you look around you will find many examples of streams:

  • User interface events
  • Bytes transferred over the network
  • New orders in online shopping
  • Posts on social-media websites

If you want to compare Observable<T> with something more familiar, Iterable<T> is probably the closest abstraction. Just like Iterator<T> created from Iterable<T>, Observable<T> can have zero to an infinite number of values of type T. Iterator is very effective at generating infinite sequences; for example, all natural numbers, as demonstrated here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class NaturalNumbersIterator implements Iterator<BigInteger> {
private BigInteger current = BigInteger.ZERO;
public boolean hasNext() {
return true;
}
@Override
public BigInteger next() {
current = current.add(BigInteger.ONE);
return current;
}
}

Another similarity is the fact that Iterator itself can signal its client that it has no more items to produce (more on that later)

Also, Observable can produce an arbitrary number of events. However, just like Iterator does not need to be backed by the underlying collection, Observable does not necessarily need to represent a stream of events.

Indeed, Observable<T> can actually produce three types of events:

  • Values of type T, as declared by Observable
  • Completion event
  • Error event

The specification of reactive extensions clearly states that every Observable can emit an arbitrary number of values optionally followed by completion or error (but not both). Strictly speaking Rx Design Guidelines define this rule as follows: OnNext* (OnCompleted | OnError)? — where OnNext represents a new event.

Additionally, you can implement an Observable to never emit any event at all, including completion or error. Such an Observable is useful for testing purposes—for example, to exercise timeouts.

Subscribing to Notifications from Observable

An instance of Observable does not emit any events until someone is actually interested in receiving them. To begin watching an Observable, you use the subscribe() family of methods:

1
2
3
Observable<Tweet> tweets = //...
tweets.subscribe((Tweet tweet) -> System.out.println(tweet));

The preceding code snippet subscribes to tweets Observable by registering a callback. This callback will be invoked every time the tweets stream decides to push an event downstream. The RxJava contract makes sure that your callback will not be invoked from more than one thread at a time, even though events can be emitted from many threads.

There are multiple overloaded versions of subscribe() that are more specific. We already mentioned that idiomatically Observable does not throw exceptions. Instead, exceptions are just another type of notification (event) that Observable can propagate. Therefore, you do not use the try-catch block around subscribe() to catch exceptions produced along the way. Instead, you provide a separate callback:

1
2
3
4
tweets.subscribe(
(Tweet tweet) -> { System.out.println(tweet); },
(Throwable t) -> { t.printStackTrace(); }
);

The second argument to subscribe() is optional. It notifies about exceptions that can potentially occur while producing items. It is guaranteed that no other Tweet will appear after the exception.

The third optional callback allows us to listen for stream completion:

1
2
3
4
5
tweets.subscribe(
(Tweet tweet) -> { System.out.println(tweet); },
(Throwable t) -> { t.printStackTrace(); },
() -> {this.noMore();}
);

Remember that RxJava is not opinionated with regard to how many items are produced, when, and when to stop. As a stream can be infinite or it can complete immediately upon subscription, it is up to the Subscriber whether it wants to receive completion notification.

As a side note, often you can use Java 8 method references instead of lambdas to improve readability, as illustrated here:

1
2
3
4
tweets.subscribe(
System.out::println,
Throwable::printStackTrace,
this::noMore);

Capturing All Notifications by Using Observer

It turns out that providing all three arguments to subscribe() is quite useful, thus it would be helpful to have a simple wrapper holding all three callbacks. This is what Observer<T> was designed for.

Observer<T> is a container for all three callbacks, receiving all possible notifications from Observable<T>. Here is how you can register an Observer<T>:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observer<Tweet> observer = new Observer<Tweet>() {
@Override
public void onNext(Tweet tweet) {
System.out.println(tweet);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onCompleted() {
noMore();
}
};
//...
tweets.subscribe(observer);

As a matter of fact Observer<T> is the core abstraction for listening in RxJava. Yet if you want even greater control, Subscriber (Observers abstract implementation) is even more powerful.

Controlling Listeners by Using Subscription and Subscriber<T>

Imagine an Observer that knows in advance how many items it wants to receive or when to stop receiving them. For example, we subscribed for stock price changes, but when the price falls below $1, we no longer want to listen. Obviously, just as Observer has the ability to subscribe, it also should be capable of unsubscribing whenever it finds it suitable. There are two means to support that: Subscription and Subscriber.

1
2
3
4
5
6
Subscription subscription =
tweets.subscribe(System.out::println);
//...
subscription.unsubscribe();

Subscription is a handle that allows client code to cancel a subscription by using the unsubscribe() method. Additionally, you can query the status of a subscription by using isUnsubscribed(). It is important to unsubscribe from Observable<T> as soon as you no longer want to receive more events; this avoids memory leaks and unnecessary load on the system.

The code example that follows subscribes to all events, but the subscriber itself decides to give up receiving notifications under certain criteria. Normally, this can be done by using the built-in takeUntil() operator, but for the time being we can unsubscribe manually:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Subscriber<Tweet> subscriber = new Subscriber<Tweet>() {
@Override
public void onNext(Tweet tweet) {
if (tweet.getText().contains("Java")) {
unsubscribe();
}
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
tweets.subscribe(subscriber);

When Subscriber decides it no longer wants to receive more items, it can unsubscribe itself. As an exercise, you can implement a Subscriber that receives only the first n events and then gives up. The Subscriber class is more powerful than that, but for the time being just remember it is capable of unsubscribing itself from Observable.

Creating Observables

Most of the time while working with RxJava you will be interacting with existing Observables, typically combining, filtering, and wrapping them with one another. However, unless you work with an external API that already exposes Observables, you first must learn where Observables come from and how you can create a stream and handle subscriptions.

First, there are several factory methods that create fixed constant Observables. These are useful if you want to use RxJava consistently across an entire codebase or when values to be emitted are cheap to produce and known in advance:

Observable.just(value)

Creates an Observable instance that emits exactly one value to all future subscribers and completes afterward. Overloaded versions of the just() operator can take anything from two to nine values to be emitted.

Observable.from(values)

Similar to just() but accepts Iterable<T> or T[], thus creating Observable<T> with as many values emitted as elements in values collection. Another overloaded version accepts a Future<T>, emitting an event when the underlying Future completes.

Observable.range(from, n)

Produces n integer numbers starting from from. For example, range(5, 3) will emit 5, 6, and 7 and then complete normally. Each subscriber will receive the same set of numbers.

Observable.empty()

Completes immediately after subscription, without emitting any values.

Observable.never()

Such Observable never emits any notifications, neither values nor completion or error. This stream is useful for testing purposes.

Observable.error()

Emits an onError() notification immediately to every subscriber. No other values are emitted and according to contract onCompleted() cannot occur as well.

Mastering Observable.create()

Interestingly, even though RxJava is all about asynchronous processing of streams of events, the aforementioned factory methods by default operate on the client thread. Have a look at the following code sample:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
//...
log("Before");
Observable
.range(5, 3)
.subscribe(i -> {
log(i);
});
log("After");

What we are interested in is the thread that executed each log statement:

1
2
3
4
5
main: Before
main: 5
main: 6
main: 7
main: After

The order of print statements is also relevant. It is not a surprise that Before and After messages are printed by the main client thread. However, notice that subscription also happened in the client thread and subscribe() actually blocked the client thread until all events were received.

Unless required by some operator RxJava does not implicitly run your code in any thread pool. To better understand this behavior, let’s study the low-level operator used for manufacturing Observables: create():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable<Integer> ints = Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
log("Create");
subscriber.onNext(5);
subscriber.onNext(6);
subscriber.onNext(7);
subscriber.onCompleted();
log("Completed");
}
});
log("Starting");
ints.subscribe(i -> log("Element: " + i));
log("Exit");

To understand how Observable.create() works and how RxJava deals with concurrency, we will analyze the execution step by step. First, we create ints Observable by supplying an implementation of the OnSubscribe callback interface to create() (later, we almost always replace it with a simple lambda expression). At this point, nothing happened yet apart from creating an instance of Observable; therefore, the first line of output we see is main: Starting. Observable defers emission of events by default, meaning it will not begin to emit any items until you actually subscribe, so the lambda expression given to create() is not executed yet. Later, we do subscribe, ints.subscribe(...), forcing Observable to begin emitting items.

This is true for streams known as cold. Hot streams, on the other hand, emit events even if no one subscribed.

Observable.create() is so versatile that in fact you can mimic all of the previously discovered factory methods on top of it. For example, Observable.just(x), emits a single value x and immediately completes afterward, might look like this:

1
2
3
4
5
6
7
static <T> Observable<T> just(T x) {
return Observable.create(subscriber -> {
subscriber.onNext(x);
subscriber.onCompleted();
}
);
}

Managing multiple subscribers

Emitting does not begin until we actually subscribe. But every time subscribe() is called, our subscription handler inside create() is invoked. This is neither an advantage nor a disadvantage, it is just something you must keep in mind.

In some cases, the fact that every subscriber gets its own unique handler invocation works great. For example, Observable.just(42) should emit 42 to every subscriber, not just the first one. On the other hand, if you put a database query or heavyweight computation inside create(), it might be beneficial to share a single invocation among all subscribers.

To ensure that you truly understand how subscription works, consider the following code sample that subscribes to the same Observable twice:

1
2
3
4
5
6
7
8
9
10
11
Observable<Integer> ints =
Observable.create(subscriber -> {
log("Create");
subscriber.onNext(42);
subscriber.onCompleted();
}
);
log("Starting");
ints.subscribe(i -> log("Element A: " + i));
ints.subscribe(i -> log("Element B: " + i));
log("Exit");

Remember that every time you subscribe to an Observable created via the create() factory method, the lambda expression passed as an argument to create() is executed independently by default within the thread that initiated the subscription:

1
2
3
4
5
6
main: Starting
main: Create
main: Element A: 42
main: Create
main: Element B: 42
main: Exit

If you would like to avoid calling create() for each subscriber and simply reuse events that were already computed, there exists a handy cache() operator:

1
2
3
4
5
6
Observable<Integer> ints =
Observable.<Integer>create(subscriber -> {
//...
}
)
.cache();

What cache() does is stand between subscribe() and our custom Observable. When the first subscriber appears, cache() delegates subscription to the underlying Observable and forwards all notifications (events, completions, or errors) downstream. However, at the same time, it keeps a copy of all notifications internally. When a subsequent subscriber wants to receive pushed notifications, cache() no longer delegates to the underlying Observable but instead feeds cached values.

With caching, the output for two Subscribers is quite different:

1
2
3
4
5
main: Starting
main: Create
main: Element A: 42
main: Element B: 42
main: Exit

Of course, you must keep in mind that cache() plus infinite stream is the recipe for a disaster, also known as OutOfMemoryError.

Infinite streams

Infinite data structures are an important concept. Computer memory is finite so having an infinite list or stream sounds impossible. But RxJava allows you to produce and consume events on the fly.

That traditional queue can be treated as an infinite source of values, despite not keeping all of them in memory at the same time. That being said how would you implement such an infinite stream by using create()? For example, let’s build an Observable that produces all natural numbers:

1
2
3
4
5
6
7
8
9
10
//BROKEN! Don't do this
Observable<BigInteger> naturalNumbers = Observable.create(
subscriber -> {
BigInteger i = ZERO;
while (true) { //don't do this!
subscriber.onNext(i);
i = i.add(ONE);
}
});
naturalNumbers.subscribe(x -> log(x));

The presence of while(true) should trigger an alarm bell in any codebase. It seems OK at first, but you should quickly realize that this implementation is broken. But not because it is infinite—as a matter of fact infinite Observables are perfectly OK and quite useful. Of course, as long as they are implemented properly. The moment you hit subscribe(), the lambda expression inside create() is invoked in the context of your thread. And because this lambda never ends, subscribe() blocks infinitely as well.

But, you might ask, “But shouldn’t subscription be asynchronous rather than running subscription handler in the client thread?” This is a valid question, so let’s spend some time introducing explicit concurrency:

1
2
3
4
5
6
7
8
9
10
11
Observable<BigInteger> naturalNumbers = Observable.create(
subscriber -> {
Runnable r = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});

Rather than have a blocking loop running directly in the client thread, we spawn a custom thread and emit events directly from there. Luckily subscribe() no longer blocks client thread, because all it does underneath is spawn a thread.

Now imagine we are not interested in all the natural numbers (there are too many of them after all), but just the first few. We know already how to stop receiving notifications from Observable — by unsubscribing:

1
2
3
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
//after some time...
subscription.unsubscribe();

If you pay attention to details, you probably noticed the suspicious-looking while(true) loop was replaced with the following:

1
2
3
while (!subscriber.isUnsubscribed()) {
//...
}

Although creating your own thread is not a good design decision, and RxJava has much better declarative tools for handling concurrency, the preceding code sample shows how you can properly handle subscription events.

Handling unsubscription immediately before trying to send an event is fine as long as events are pushed relatively often. But imagine a situation in which events appear very rarely. Observable can only determine that a subscriber unsubscribed when it attempts to push some event to it.

Take the following useful factory method as an example: delayed(x) creates an Observable that emits value x after sleeping for 10 seconds. It is similar to Observable.just(), but with extra delay. We know already that extra thread needs to be used, even though it is not the best usage pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static <T> Observable<T> delayed(T x) {
return Observable.create(
subscriber -> {
Runnable r = () -> {
sleep(10, SECONDS);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(x);
subscriber.onCompleted();
}
};
new Thread(r).start();
});
}
static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException ignored) {
//intentionally ignored
}
}

The naive implementation spawns a new thread and goes to sleep for 10 seconds. A more robust implementation should at least use java.util.concurrent.ScheduledExecutorService, but this is for educational purposes only. After 10 seconds we ensure that someone is still listening, and if that is the case, we emit a single item and complete.

But what if the subscriber decides to unsubscribe one second after subscribing, long before the event is supposed to be emitted? Well, nothing really. The background thread sleeps for the remaining nine seconds just to realize the subscriber is long gone. This is what bothers us; holding the resource for an extra nine seconds seems wasteful.

Luckily, with a subscriber instance we can be notified as soon as it unsubscribes, cleaning up resources as soon as possible, not when the next message appears:

1
2
3
4
5
6
7
8
9
static <T> Observable<T> delayed(T x) {
return Observable.create(
subscriber -> {
Runnable r = () -> {/* ... */};
final Thread thread = new Thread(r);
thread.start();
subscriber.add(Subscriptions.create(thread::interrupt));
});
}

The last line is crucial, but everything else remained the same. The background thread is already running — or, to be precise, sleeping for 10 seconds. But just after spawning a thread, we ask the subscriber to let us know by invoking a callback if it unsubscribes and is registered via Subscriber.add(). This callback has basically a single purpose: to interrupt a thread.

What calling Thread.interrupt() does is throw an InterruptedException inside sleep(), prematurely interrupting our 10-second pause. sleep() exits gracefully after swallowing the exception. However, at this point subscriber.isUnsubscribed() returns false and no event is emitted. The thread stops immediately and no resources are wasted. You can use the same pattern to perform any cleanup. However, if your stream produces a steady, frequent flow of events, you can probably live without explicit callback.

There is another reason why you should not use explicit threads inside create(). The Rx Design Guidelines in section 4.2. Assume observer instances are called in a serialized fashion require that subscribers never receive notifications concurrently. It is easy to violate this requirement when explicit threads are involved.

Such an assumption allows writing Observers as if they were synchronized, always accessed by at most one thread. This holds true despite events that can come from multiple threads. Custom implementations of Observable must ensure that this contract is met. With that in mind, look at the following code that nonidiomatically tries to parallelize loading of multiple chunks of Data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable<Data> loadAll(Collection<Integer> ids) {
return Observable.create(subscriber -> {
ExecutorService pool = Executors.newFixedThreadPool(10);
AtomicInteger countDown = new AtomicInteger(ids.size());
//DANGER, violates Rx contract. Don't do this!
ids.forEach(id -> pool.submit(() -> {
final Data data = load(id);
subscriber.onNext(data);
if (countDown.decrementAndGet() == 0) {
pool.shutdownNow();
subscriber.onCompleted();
}
}));
});
}

This code, apart from accidentally being quite complex, violates some Rx principles. Namely it allows calling the subscriber’s onNext() method from multiple threads concurrently. Second, you can avoid the complexity by simply applying idiomatic RxJava operators, such as merge() and flatMap(). The good news is that even if someone poorly implemented the Observable, we can easily fix it by applying the serialize() operator, such as loadAll(...).serialize(). This operator ensures that events are serialized and sequenced. It also enforces that no more events are sent after completion or error.

The last aspect of creating Observables that we have not yet covered is error propagation. We’ve learned so far that Observer<T> can receive values of type T, optionally followed by either completion or error. But how do you push errors downstream to all subscribers? It is a good practice to wrap entire expressions within create() in a try-catch block. Throwables should be propagated downstream rather than logged or rethrown, as demonstrated here:

1
2
3
4
5
6
7
8
9
10
Observable<Data> rxLoad(int id) {
return Observable.create(subscriber -> {
try {
subscriber.onNext(load(id));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
});
}

This extra try-catch block is necessary to propagate the possible Exception thrown from, for example, load(id). Otherwise, RxJava will do its best to at least print the exception to standard output, but to build resilient streams, exceptions need to be treated as first-class citizens, not just as extra features in the language that no one truly understands.

The pattern of completing an Observable with one value and wrapping with the try-catch statement is so prevalent that the built-in fromCallable() operator was introduced:

1
2
3
4
Observable<Data> rxLoad(int id) {
return Observable.fromCallable(() ->
load(id));
}

It is semantically equivalent but much shorter and has some other benefits over create() that you will discover later.

Timing: timer() and interval()

In later chapters, we will explore schedulers, but first let’s discover two very useful operators that use threads underneath: timer() and interval(). The former simply creates an Observable that emits a long value of zero after a specified delay and then completes:

1
2
3
Observable
.timer(1, TimeUnit.SECONDS)
.subscribe((Long zero) -> log(zero));

As silly as it sounds, timer() is extremely useful. It is basically an asynchronous equivalent of Thread.sleep(). Rather than blocking the current thread, we create an Observable and subscribe() to it. It will become significantly more important after we learn how to compose simple Observables into more complex computations. The fixed value of 0 (in variable zero) is just a convention without any specific meaning. However, it makes more sense when interval() is introduced. interval() generates a sequence of long numbers, beginning with zero, with a fixed delay between each one of them:

1
2
3
Observable
.interval(1_000_000 / 60, MICROSECONDS)
.subscribe((Long i) -> log(i));

Observable.interval() produces a sequence of consecutive long numbers, beginning with 0. However, unlike range(), interval() places a fixed delay before every event, including the first one.

Hot and Cold Observables

After you get an instance of Observable, it is important to understand whether the stream is hot or cold. The API and semantics remain the same, but the way you use Observable will depend on the type.

A cold Observable is entirely lazy and never begins to emit events until someone is actually interested. If there are no observers, Observable is just a static data structure. This also implies that every subscriber receives its own copy of the stream because events are produced lazily but also not likely cached in any way.

Cold Observables typically come from Observable.create(), which idiomatically should not start any logic but instead postpone it until someone actually listens. A cold Observable is thus somewhat dependent on Subscriber. Examples of cold Observables, apart from create(), include Observable.just(), from(), and range(). Subscribing to a cold Observable often involves a side effect happening inside create(). For example, the database is queried or a connection is opened.

Hot Observables are different. After you get a hold of such an Observable it might already be emitting events no matter how many Subscribers they have. Observable pushes events downstream, even if no one listens and events are possibly missed.

Whereas typically you have full control over cold Observables, hot Observables are independent from consumers. When a Subscriber appears, a hot Observable behaves like a wire tap, transparently publishing events flowing through it. The presence or absence of Subscriber does not alter the behavior of Observable; it is entirely decoupled and independent.

Hot Observables typically occur when we have absolutely no control over the source of events. Examples of such Observables include mouse movements, keyboard inputs, or button clicks. So far, we haven’t even mentioned the user interface, but it turns out that RxJava fits perfectly when implementing user interfaces. This library is especially appreciated in the Android community, where it helps in transforming from nested callbacks to flat composition of streams.

The importance of hot versus cold distinction becomes essential when we rely on delivery of events. No matter when you subscribe to a cold Observable—immediately or after hours—you always receive a complete and consistent set of events. On the other hand, if the Observable is hot, you can never be sure you received all events from the beginning.

Another interesting distinction that comes to mind between hot and cold sources is time dependency. A cold Observable produces values on demand and possibly multiple times so the exact instant when an item was created is irrelevant. Conversely, hot Observables represent events as they come, typically from some external source. This means that the instant when a given value was generated is very significant because it places the event on the timescale.

Use Case: From Callback API to Observable Stream

Luckily, when a domain is so inherently asynchronous, you will most likely find some sort of callback-based API, so prevalent, for example, in JavaScript. These APIs will accept some form of callback, typically an interface with a bunch of methods that you can implement to notify you about various events.

The most striking example of such an API is almost every graphical user interface library out there: for example, Swing. When various listeners like onClick() or onKeyUp() are used, callbacks are certainly inevitable. If you’ve worked in such environments, the term callback hell is definitely familiar to you. Callbacks have a tendency to nest in one another, so coordinating multiple callbacks is virtually impossible. Here is an example of a callback nested four times:

1
2
3
4
5
6
7
8
9
10
11
12
13
button.setOnClickListener(view -> {
MyApi.asyncRequest(response -> {
Thread thread = new Thread(() -> {
int year = datePicker.getYear();
runOnUiThread(() -> {
button.setEnabled(false);
button.setText("" + year);
});
});
thread.setDaemon(true);
thread.start();
});
});

The simplest requirements, like reacting when two callbacks are invoked shortly after each other, becomes a nightmare, and is additionally hindered by multithreading. In this section, we will refactor a callback-based API into RxJava with all the benefits such as controlling threads, lifecycle, and cleanup.

For the purpose of this exercise, we will use the open source Twitter4J library that can push a subset of new tweets using a callback-based API. Twitter4J was chosen as a good example of an API using callbacks with an interesting domain. The simplest working example of reading tweets in real-time might look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new twitter4j.StatusListener() {
@Override
public void onStatus(Status status) {
log.info("Status: {}", status);
}
@Override
public void onException(Exception ex) {
log.error("Error callback", ex);
}
//other callbacks
});
twitterStream.sample();
TimeUnit.SECONDS.sleep(10);
twitterStream.shutdown();

Calling twitterStream.sample() starts a background thread that logs in to Twitter and awaits new messages. Every time a tweet appears, the onStatus callback is executed. Execution can jump between threads, therefore we can no longer rely on throwing exceptions. Instead the onException() notification is used.

Overall, it does not look that bad, the problem is that this program is not doing anything. In real life, you would probably process each Status message (tweet) somehow. For example, save it to a database or feed a machine-learning algorithm. You can technically put that logic inside the callback, but this couples the infrastructural call with the business logic. Simple delegation to a separate class is better, but unfortunately not reusable. What we really want is clean separation between the technical domain (consuming data from an HTTP connection) and the business domain (interpreting input data). So we build a second layer of callbacks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void consume(
Consumer<Status> onStatus,
Consumer<Exception> onException) {
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
onStatus.accept(status);
}
@Override
public void onException(Exception ex) {
onException.accept(ex);
}
//other callbacks
});
twitterStream.sample();
}

By adding this one extra level of abstraction we can now reuse the consume() method in various ways. Imagine that instead of logging you have persistence, analytics, or fraud detection:

1
2
3
4
consume(
status -> log.info("Status: {}", status),
ex -> log.error("Error callback", ex)
);

But we just shifted the problem up in the hierarchy. What if we want to count the number of tweets per second? Or consume just the first five? And what if we would like to have multiple listeners? In these situations, each of these situations opens a new HTTP connection. Last but not least, this API does not allow unsubscribing when we are done, risking resource leak.

We hope you realize that we are heading toward an Rx-powered API. Rather than passing callbacks down to the place where they can be executed, we can return an Observable<Status> and let everyone subscribe whenever they want. However, keep in mind that the following implementation still opens a new network connection for each Subscriber:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable<Status> observe() {
return Observable.create(subscriber -> {
TwitterStream twitterStream =
new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
subscriber.onNext(status);
}
@Override
public void onException(Exception ex) {
subscriber.onError(ex);
}
//other callbacks
});
subscriber.add(Subscriptions.create(twitterStream::shutdown));
});
}

At this point, we can simply call observe(), which only creates an Observable and does not contact the external server. We learned that unless someone actually subscribes, the contents of create() are not executed. The subscription is very similar:

1
2
3
4
observe().subscribe(
status -> log.info("Status: {}", status),
ex -> log.error("Error callback", ex)
);

The big difference here, compared to consume(...), is that we are not forced to pass callbacks as arguments to observe(). Instead, we can return Observable<Status>, pass it around, store it somewhere, and use it whenever and wherever we feel like it is needed. One important aspect that we have not covered is resource clean-up. When someone unsubscribes, we should shut down TwitterStream to avoid resource leak. We already know two techniques for that; let’s use the simpler one first:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void onStatus(Status status) {
if (subscriber.isUnsubscribed()) {
twitterStream.shutdown();
} else {
subscriber.onNext(status);
}
}
@Override
public void onException(Exception ex) {
if (subscriber.isUnsubscribed()) {
twitterStream.shutdown();
} else {
subscriber.onError(ex);
}
}

When someone subscribes only to receive a small fraction of the stream, our Observable will make sure to clean up the resources. We know a second technique to implement clean-up that does not require waiting for an upstream event. The moment a subscriber unsubscribes, we call shutdown() immediately, rather than waiting for the next tweet to come just to trigger clean-up behavior (last line):

1
2
3
4
5
6
twitterStream.addListener(new StatusListener() {
//callbacks...
});
twitterStream.sample();
subscriber.add(Subscriptions.create(twitterStream::shutdown));

Interestingly, this Observable blurs the difference between hot and cold streams. On one hand, it represents external events that appear without our control (hot behavior). On the other hand, events will not begin flowing (no underlying HTTP connection) to our system until we actually subscribe().

One more side effect that we forgot about is still creeping in: every new subscribe() will start a new background thread and new connection to an external system. The same instance of Observable<Status> should be reusable across many subscribers, and because Observable is lazy, you should technically be able to call observe() once upon startup and keep it in some singleton. But the current implementation simply opens a new connection, effectively fetching the same data multiple times from the network, for each Subscriber.

We certainly want to register multiple Subscribers of that stream, but there is no reason why every Subscriber is supposed to fetch the same data independently. What we really want is a pub-sub behavior wherein one publisher (external system) delivers data to multiple Subscribers. In theory, the cache() operator can do that, but we don’t want to buffer old events forever. We will now explore some solutions to this problem.

Manually Managing Subscribers

Manually keeping track of all subscribers and shutting down the connection to the external system only when all subscribers leave is a Sisyphean task that we will implement anyway, just to appreciate idiomatic solutions later on. The idea is to keep track of all subscribers in some sort of Set<Subscriber<Status>> and start/shut down the external system connection when it becomes empty/nonempty:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//DON'T DO THIS, very brittle and error prone
class LazyTwitterObservable {
private final Set<Subscriber<? super Status>> subscribers =
new CopyOnWriteArraySet<>();
private final TwitterStream twitterStream;
public LazyTwitterObservable() {
this.twitterStream = new TwitterStreamFactory().getInstance();
this.twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
subscribers.forEach(s -> s.onNext(status));
}
@Override
public void onException(Exception ex) {
subscribers.forEach(s -> s.onError(ex));
}
//other callbacks
});
}
private final Observable<Status> observable = Observable.create(
subscriber -> {
register(subscriber);
subscriber.add(Subscriptions.create(() ->
this.deregister(subscriber)));
});
Observable<Status> observe() {
return observable;
}
private synchronized void register(Subscriber<? super Status> subscriber) {
if (subscribers.isEmpty()) {
subscribers.add(subscriber);
twitterStream.sample();
} else {
subscribers.add(subscriber);
}
}
private synchronized void deregister(Subscriber<? super Status> subscriber) {
subscribers.remove(subscriber);
if (subscribers.isEmpty()) {
twitterStream.shutdown();
}
}
}

The subscribers set thread-safely stores a collection of currently subscribed Observers. Every time a new Subscriber appears, we add it to a set and connect to the underlying source of events lazily. Conversely, when the last Subscriber disappears, we shut down the upstream source.

The key here is to always have exactly one connection to the upstream system rather than one connection per subscriber. This works and is quite robust, however, the implementation seems too low-level and error-prone. Access to the subscribers set must be synchronized, but the collection itself must also support safe iteration. Calling register() must appear before adding the deregister() callback; otherwise, the latter can be called before we register.

There must be a better way to implement such a common scenario of multiplexing a single upstream source to multiple Observers — luckily, there are at least two such mechanisms. RxJava is all about reducing such dangerous boilerplate and abstracting away concurrency.

rx.subjects.Subject

The Subject class is quite interesting because it extends Observable and implements Observer at the same time. What that means is that you can treat it as Observable on the client side (subscribing to upstream events) and as Observer on the provider side (pushing events downstream on demand by calling onNext() on it).

Typically, what you do is keep a reference to Subject internally so that you can push events from any source you like but externally expose this Subject as Observable.

Let’s reimplement streaming Status updates using Subject. To further simplify implementation, we connect to the external system eagerly and do not keep track of subscribers. Apart from simplifying our example, this has the benefit of smaller latency when the first Subscriber appears. Events are already flowing, we don’t need to wait to reconnect to some third-party application:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TwitterSubject {
private final PublishSubject<Status> subject = PublishSubject.create();
public TwitterSubject() {
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
subject.onNext(status);
}
@Override
public void onException(Exception ex) {
subject.onError(ex);
}
//other callbacks
});
twitterStream.sample();
}
public Observable<Status> observe() {
return subject;
}
}

PublishSubject is one of the flavors (subclasses) of Subject. We eagerly begin receiving events from the upstream system and simply push them (by calling subject.onNext(...)) to all Subscribers. Subjec keeps track of these events internally so that we no longer need to.

Notice how we simply return subject in observe(), pretending it is a plain Observable. Now when someone subscribes, the Subscriber will receive all subsequent events immediately after onNext() is called on the backend — at least until it unsubscribes.

Subject is a useful tool for creating Observable instances when Observable.create(...) seems too complex to manage. Other types of Subjects include the following:

AsyncSubject

Remembers last emitted value and pushes it to subscribers when onComplete() is called. As long as AsyncSubject has not completed, events except the last one are discarded.

BehaviorSubject

Pushes all events emitted after subscription happened, just like PublishSubject. However, first it emits the most recent event that occurred just before subscription. This allows Subscriber to be immediately notified about the state of the stream. For example, Subject may represent the current temperature broadcasted every minute. When a client subscribes, he will receive the last seen temperature immediately rather than waiting several seconds for the next event. But the same Subscriber is not interested in historical temperatures, only the last one. If no events have yet been emitted, a special default event is pushed first (if provided).

ReplaySubject

The most interesting type of Subject that caches events pushed through the entire history. If someone subscribes, first he receives a batch of missed (cached) events and only later events in real-time. By default, all events since the creation of this Subject are cached.

This can be become dangerous if the stream is infinite or very long. In that case, there are overloaded versions of ReplaySubject that keep only the following:

  • Configurable number of events in memory (createWithSize())
  • Configurable time window of most recent events (createWithTime())
  • Or even constraint both size and time (whichever limit is reached first) with createWithTimeAndSize()

Subjects should be treated with caution: often there are more idiomatic ways of sharing subscriptions and caching events—for example, see ConnectableObservable. For the time being, prefer relatively low-level Observable.create() or even better, consider standard factory methods like from() and just().

ConnectableObservable

ConnectableObservable is an interesting way of coordinating multiple Subscribers and sharing a single underlying subscription. ConnectableObservable is a type of Observable that ensures there exists at most one Subscriber at all times, but in reality there can be many of them sharing the same underlying resource.

There are many applications of ConnectableObservable; for example, making sure all Subscribers receive the same sequence of events regardless of when they subscribed. ConnectableObservable can also force subscription if it generates important side effects, even when no “real” Subscriber has appeared yet.

Subjects are imperative ways of creating Observables, whereas ConnectableObservable shields the original upstream Observable and guarantees at most one Subscriber reaches it. No matter how many Subscribers connect to ConnectableObservable, it opens just one subscription to the Observable from which it was created.

Single Subscription with publish().refCount()

Let us recap: we have a single handle to the underlying resource; for example, HTTP connection to stream of Twitter status updates. However, an Observable pushing these events will be shared among multiple Subscribers. The naive implementation of this Observable created earlier had no control over this; therefore, each Subscriber started its own connection. This is quite wasteful:

1
2
3
4
5
6
7
8
9
10
Observable<Status> observable = Observable.create(subscriber -> {
System.out.println("Establishing connection");
TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
//...
subscriber.add(Subscriptions.create(() -> {
System.out.println("Disconnecting");
twitterStream.shutdown();
}));
twitterStream.sample();
});

When we try to use this Observable, each Subscriber establishes a new connection, like so:

1
2
3
4
5
6
7
8
Subscription sub1 = observable.subscribe();
System.out.println("Subscribed 1");
Subscription sub2 = observable.subscribe();
System.out.println("Subscribed 2");
sub1.unsubscribe();
System.out.println("Unsubscribed 1");
sub2.unsubscribe();
System.out.println("Unsubscribed 2");

Here is the output:

1
2
3
4
5
6
7
8
Establishing connection
Subscribed 1
Establishing connection
Subscribed 2
Disconnecting
Unsubscribed 1
Disconnecting
Unsubscribed 2

This time, to simplify, we use a parameterless subscribe() method that triggers subscription but drops all events and notifications. After spending almost half of the chapter fighting with this problem and familiarizing ourselves with plenty of RxJava features, we can finally introduce the most scalable and simplest solution: the publish().refCount() pair:

1
2
3
4
5
6
7
8
9
10
11
lazy = observable.publish().refCount();
//...
System.out.println("Before subscribers");
Subscription sub1 = lazy.subscribe();
System.out.println("Subscribed 1");
Subscription sub2 = lazy.subscribe();
System.out.println("Subscribed 2");
sub1.unsubscribe();
System.out.println("Unsubscribed 1");
sub2.unsubscribe();
System.out.println("Unsubscribed 2");

The output is much like what we expect:

1
2
3
4
5
6
7
Before subscribers
Establishing connection
Subscribed 1
Subscribed 2
Unsubscribed 1
Disconnecting
Unsubscribed 2

The connection is not established until we actually get the first Subscriber. But, more important, the second Subscriber does not initiate a new connection, it does not even touch the original Observable. The publish().refCount() tandem wrapped the underlying Observable and intercepted all subscriptions.

What refCount does is basically count how many active Subscribers we have at the moment, much like reference counting in historic garbage-collection algorithms. When this number goes from zero to one, it subscribes to the upstream Observable. Every number above one is ignored and the same upstream Subscriber is simply shared between all downstream Subscribers. However, when the very last downstream Subscriber unsubscribes, the counter drops from one to zero and refCount() knows it must unsubscribe right away.

publish().refCount() is used very frequently and therefore has an alias named share(). Keep in mind that if unsubscription is shortly followed by subscription, share() still performs reconnection, as if there were no caching at all.

ConnectableObservable Lifecycle

Another useful use case of the publish() operator is forcing subscription in the absence of any Subscriber. Imagine that we have our Observable<Status>. Before we expose it to our clients we want to store each event in the database, regardless of whether someone is subscribed. A naive approach is not sufficient:

1
2
3
Observable<Status> tweets = //...
return tweets
.doOnNext(this::saveStatus);

We are using the doOnNext() operator that peeks every item that flows through the stream and performs some action, like saveStatus(). However, remember that Observables are lazy by design; therefore, as long as no one subscribed, doOnNext() is not triggered.

What we want is a fake Observer that does not really listen to events but forces upstream Observables to produce events. There is actually an overloaded version of subscribe() that does exactly this:

1
2
3
4
Observable<Status> tweets = //...
tweets
.doOnNext(this::saveStatus)
.subscribe();

This empty Subscriber in the end invokes Observable.create() and connects to the upstream source of events. This seems to solve the problem, but we again forgot to protect ourselves from multiple subscribers. If we expose tweets outside, the second subscriber will make a second attempt to connect to the external resource—for example, opening a second HTTP connection.

The idiomatic solution is to use publish().connect() duet that creates an artificial Subscriberimmediately while keeping just one upstream Subscriber. This is best explained with an example.

1
2
ConnectableObservable<Status> published = tweets.publish();
published.connect();

Anyone who subscribes to ConnectableObservable is placed in a set of Subscribers. As long as connect() is not called, these Subscribers are put on hold, they never directly subscribe to upstream Observable. However, when connect() is called, a dedicated mediating Subscriber subscribes to upstream Observable (tweets), no matter how many downstream subscribers appeared before—even if there were none. But if there were some Subscribers of ConnectableObservable put on hold, they will all receive the same sequence of notifications.

This mechanism has multiple advantages. Imagine that you have an Observable in your application in which multiple Subscribers are interested. On startup, several components (e.g., Spring beans or EJBs) subscribe to that Observable and begin listening. Without ConnectableObservable, it is very likely that hot Observable will begin emitting events that will be consumed by the first Subscriber, but Subscribers started later will miss out on the early events. This can be a problem if you want to be absolutely sure that all Subscribers receive a consistent view of the world. All of them will receive events in the same order, unfortunately Subscriber appearing late will lose early notifications.

The solution to this problem is to publish() such an Observable first and make it possible for all of the components in your system to subscribe(); for example, during application startup. When you are 100% sure that all Subscribers that need to receive the same sequence of events (including initial event) had a chance to subscribe(), connect such ConnectableObservable with connect(). This will create a single Subscriber in upstream Observable and begin pushing events to all downstream Subscribers.

Summary

Creating and subscribing to Observable are essential features of RxJava. Especially beginners tend to forget about subscription and are surprised that no events are emitted. Many developers focus on amazing operators provided by this library, but failing to understand how these operators perform subscription underneath can cause subtle bugs.

Moreover, the asynchronous nature of RxJava is typically taken for granted, which is not really the case. As a matter of fact, most operators in RxJava do not use any particular thread pool. More precisely this means that by default no concurrency is involved whatsoever and everything happens in client thread. This is another important take away of this chapter. Now, when you understand subscription and concurrency principles, you are ready to begin using RxJava painlessly and effectively.

(To Be Continued)