This chapter will guide you through the core concepts related to Reactive Extensions and RxJava. You will become very comfortable with
Subscriber<T> and a few helpful utility methods called operators.
Observable is the core API RxJava, so make sure you understand how it works and what it represents.
Throughout this chapter, you will learn what
Observable really is and how to create it and interact with it. The knowledge you gain is essential to idiomatically provide and consume reactive APIs based on RxJava. RxJava was designed to ease the pain of asynchronous and event-driven programming, but you must understand some core principles and semantics in order to take advantage of that.
rx.Observable<T> represent a flowing sequence of values. It is the abstraction that you will use all of the time. Because these values often appear over a wide time range, we tend to think about an Observable as a stream of events. If you look around you will find many examples of streams:
- User interface events
- Bytes transferred over the network
- New orders in online shopping
- Posts on social-media websites
If you want to compare
Observable<T> with something more familiar,
Iterable<T> is probably the closest abstraction. Just like
Iterator<T> created from
Observable<T> can have zero to an infinite number of values of type
Iterator is very effective at generating infinite sequences; for example, all natural numbers, as demonstrated here:
Another similarity is the fact that
Iterator itself can signal its client that it has no more items to produce (more on that later)
Observable can produce an arbitrary number of events. However, just like
Iterator does not need to be backed by the underlying collection,
Observable does not necessarily need to represent a stream of events.
Observable<T> can actually produce three types of events:
- Values of type
T, as declared by
- Completion event
- Error event
The specification of reactive extensions clearly states that every Observable can emit an arbitrary number of values optionally followed by completion or error (but not both). Strictly speaking Rx Design Guidelines define this rule as follows:
OnNext* (OnCompleted | OnError)? — where
OnNext represents a new event.
Additionally, you can implement an
Observable to never emit any event at all, including completion or error. Such an
Observable is useful for testing purposes—for example, to exercise timeouts.
An instance of
Observable does not emit any events until someone is actually interested in receiving them. To begin watching an
Observable, you use the
subscribe() family of methods:
The preceding code snippet subscribes to tweets
Observable by registering a callback. This callback will be invoked every time the tweets stream decides to push an event downstream. The RxJava contract makes sure that your callback will not be invoked from more than one thread at a time, even though events can be emitted from many threads.
There are multiple overloaded versions of
subscribe() that are more specific. We already mentioned that idiomatically
Observable does not throw exceptions. Instead, exceptions are just another type of notification (event) that
Observable can propagate. Therefore, you do not use the
try-catch block around
subscribe() to catch exceptions produced along the way. Instead, you provide a separate callback:
The second argument to
subscribe() is optional. It notifies about exceptions that can potentially occur while producing items. It is guaranteed that no other
Tweet will appear after the exception.
The third optional callback allows us to listen for stream completion:
Remember that RxJava is not opinionated with regard to how many items are produced, when, and when to stop. As a stream can be infinite or it can complete immediately upon subscription, it is up to the
Subscriber whether it wants to receive completion notification.
As a side note, often you can use Java 8 method references instead of lambdas to improve readability, as illustrated here:
It turns out that providing all three arguments to
subscribe() is quite useful, thus it would be helpful to have a simple wrapper holding all three callbacks. This is what
Observer<T> was designed for.
Observer<T> is a container for all three callbacks, receiving all possible notifications from
Observable<T>. Here is how you can register an
As a matter of fact
Observer<T> is the core abstraction for listening in RxJava. Yet if you want even greater control,
Observers abstract implementation) is even more powerful.
Observer that knows in advance how many items it wants to receive or when to stop receiving them. For example, we subscribed for stock price changes, but when the price falls below $1, we no longer want to listen. Obviously, just as
Observer has the ability to subscribe, it also should be capable of unsubscribing whenever it finds it suitable. There are two means to support that:
Subscription is a handle that allows client code to cancel a subscription by using the
unsubscribe() method. Additionally, you can query the status of a subscription by using
isUnsubscribed(). It is important to unsubscribe from
Observable<T> as soon as you no longer want to receive more events; this avoids memory leaks and unnecessary load on the system.
The code example that follows subscribes to all events, but the subscriber itself decides to give up receiving notifications under certain criteria. Normally, this can be done by using the built-in
takeUntil() operator, but for the time being we can unsubscribe manually:
Subscriber decides it no longer wants to receive more items, it can unsubscribe itself. As an exercise, you can implement a
Subscriber that receives only the first n events and then gives up. The
Subscriber class is more powerful than that, but for the time being just remember it is capable of unsubscribing itself from
Most of the time while working with RxJava you will be interacting with existing
Observables, typically combining, filtering, and wrapping them with one another. However, unless you work with an external API that already exposes
Observables, you first must learn where
Observables come from and how you can create a stream and handle subscriptions.
First, there are several factory methods that create fixed constant
Observables. These are useful if you want to use RxJava consistently across an entire codebase or when values to be emitted are cheap to produce and known in advance:
Observable instance that emits exactly one value to all future subscribers and completes afterward. Overloaded versions of the
just() operator can take anything from two to nine values to be emitted.
just() but accepts
T, thus creating
Observable<T> with as many values emitted as elements in values collection. Another overloaded version accepts a
Future<T>, emitting an event when the underlying
n integer numbers starting from from. For example,
range(5, 3) will emit
7 and then complete normally. Each subscriber will receive the same set of numbers.
Completes immediately after subscription, without emitting any values.
Observable never emits any notifications, neither values nor completion or error. This stream is useful for testing purposes.
onError() notification immediately to every subscriber. No other values are emitted and according to contract
onCompleted() cannot occur as well.
Interestingly, even though RxJava is all about asynchronous processing of streams of events, the aforementioned factory methods by default operate on the client thread. Have a look at the following code sample:
What we are interested in is the thread that executed each log statement:
The order of
After messages are printed by the main client thread. However, notice that subscription also happened in the client thread and
subscribe() actually blocked the client thread until all events were received.
Unless required by some operator RxJava does not implicitly run your code in any thread pool. To better understand this behavior, let’s study the low-level operator used for manufacturing
To understand how
Observable.create() works and how RxJava deals with concurrency, we will analyze the execution step by step. First, we create ints
Observable by supplying an implementation of the
OnSubscribe callback interface to
create() (later, we almost always replace it with a simple lambda expression). At this point, nothing happened yet apart from creating an instance of
Observable; therefore, the first line of output we see is main: Starting.
Observable defers emission of events by default, meaning it will not begin to emit any items until you actually subscribe, so the lambda expression given to
create() is not executed yet. Later, we do
Observable to begin emitting items.
This is true for streams known as cold. Hot streams, on the other hand, emit events even if no one subscribed.
Observable.create() is so versatile that in fact you can mimic all of the previously discovered factory methods on top of it. For example,
Observable.just(x), emits a single value
x and immediately completes afterward, might look like this:
Emitting does not begin until we actually subscribe. But every time
subscribe() is called, our subscription handler inside
create() is invoked. This is neither an advantage nor a disadvantage, it is just something you must keep in mind.
In some cases, the fact that every subscriber gets its own unique handler invocation works great. For example,
Observable.just(42) should emit
42 to every subscriber, not just the first one. On the other hand, if you put a database query or heavyweight computation inside
create(), it might be beneficial to share a single invocation among all subscribers.
To ensure that you truly understand how subscription works, consider the following code sample that subscribes to the same
Remember that every time you subscribe to an
Observable created via the
create() factory method, the lambda expression passed as an argument to
create() is executed independently by default within the thread that initiated the subscription:
If you would like to avoid calling
create() for each subscriber and simply reuse events that were already computed, there exists a handy
cache() does is stand between
subscribe() and our custom
Observable. When the first subscriber appears,
cache() delegates subscription to the underlying
Observable and forwards all notifications (events, completions, or errors) downstream. However, at the same time, it keeps a copy of all notifications internally. When a subsequent subscriber wants to receive pushed notifications,
cache() no longer delegates to the underlying
Observable but instead feeds cached values.
With caching, the output for two
Subscribers is quite different:
Of course, you must keep in mind that
cache() plus infinite stream is the recipe for a disaster, also known as
Infinite data structures are an important concept. Computer memory is finite so having an infinite list or stream sounds impossible. But RxJava allows you to produce and consume events on the fly.
That traditional queue can be treated as an infinite source of values, despite not keeping all of them in memory at the same time. That being said how would you implement such an infinite stream by using
create()? For example, let’s build an
Observable that produces all natural numbers:
The presence of
while(true) should trigger an alarm bell in any codebase. It seems OK at first, but you should quickly realize that this implementation is broken. But not because it is infinite—as a matter of fact infinite Observables are perfectly OK and quite useful. Of course, as long as they are implemented properly. The moment you hit
subscribe(), the lambda expression inside
create() is invoked in the context of your thread. And because this lambda never ends,
subscribe() blocks infinitely as well.
But, you might ask, “But shouldn’t subscription be asynchronous rather than running subscription handler in the client thread?” This is a valid question, so let’s spend some time introducing explicit concurrency:
Rather than have a blocking loop running directly in the client thread, we spawn a custom thread and emit events directly from there. Luckily
subscribe() no longer blocks client thread, because all it does underneath is spawn a thread.
Now imagine we are not interested in all the natural numbers (there are too many of them after all), but just the first few. We know already how to stop receiving notifications from
Observable — by unsubscribing:
If you pay attention to details, you probably noticed the suspicious-looking
while(true) loop was replaced with the following:
Although creating your own thread is not a good design decision, and RxJava has much better declarative tools for handling concurrency, the preceding code sample shows how you can properly handle subscription events.
Handling unsubscription immediately before trying to send an event is fine as long as events are pushed relatively often. But imagine a situation in which events appear very rarely.
Observable can only determine that a subscriber unsubscribed when it attempts to push some event to it.
Take the following useful factory method as an example:
delayed(x) creates an
Observable that emits value
x after sleeping for 10 seconds. It is similar to
Observable.just(), but with extra delay. We know already that extra thread needs to be used, even though it is not the best usage pattern:
The naive implementation spawns a new thread and goes to sleep for 10 seconds. A more robust implementation should at least use
java.util.concurrent.ScheduledExecutorService, but this is for educational purposes only. After 10 seconds we ensure that someone is still listening, and if that is the case, we emit a single item and complete.
But what if the
subscriber decides to unsubscribe one second after subscribing, long before the event is supposed to be emitted? Well, nothing really. The background thread sleeps for the remaining nine seconds just to realize the
subscriber is long gone. This is what bothers us; holding the resource for an extra nine seconds seems wasteful.
Luckily, with a
subscriber instance we can be notified as soon as it unsubscribes, cleaning up resources as soon as possible, not when the next message appears:
The last line is crucial, but everything else remained the same. The background thread is already running — or, to be precise, sleeping for 10 seconds. But just after spawning a thread, we ask the subscriber to let us know by invoking a callback if it unsubscribes and is registered via
Subscriber.add(). This callback has basically a single purpose: to interrupt a thread.
Thread.interrupt() does is throw an
sleep(), prematurely interrupting our 10-second pause.
sleep() exits gracefully after swallowing the exception. However, at this point
false and no event is emitted. The thread stops immediately and no resources are wasted. You can use the same pattern to perform any cleanup. However, if your stream produces a steady, frequent flow of events, you can probably live without explicit callback.
There is another reason why you should not use explicit threads inside
create(). The Rx Design Guidelines in section 4.2. Assume observer instances are called in a serialized fashion require that subscribers never receive notifications concurrently. It is easy to violate this requirement when explicit threads are involved.
Such an assumption allows writing
Observers as if they were synchronized, always accessed by at most one thread. This holds true despite events that can come from multiple threads. Custom implementations of
Observable must ensure that this contract is met. With that in mind, look at the following code that nonidiomatically tries to parallelize loading of multiple chunks of
This code, apart from accidentally being quite complex, violates some Rx principles. Namely it allows calling the
onNext() method from multiple threads concurrently. Second, you can avoid the complexity by simply applying idiomatic RxJava operators, such as
flatMap(). The good news is that even if someone poorly implemented the
Observable, we can easily fix it by applying the
serialize() operator, such as
loadAll(...).serialize(). This operator ensures that events are serialized and sequenced. It also enforces that no more events are sent after completion or error.
The last aspect of creating
Observables that we have not yet covered is error propagation. We’ve learned so far that
Observer<T> can receive values of type
T, optionally followed by either completion or error. But how do you push errors downstream to all subscribers? It is a good practice to wrap entire expressions within
create() in a
Throwables should be propagated downstream rather than logged or rethrown, as demonstrated here:
try-catch block is necessary to propagate the possible
Exception thrown from, for example,
load(id). Otherwise, RxJava will do its best to at least print the exception to standard output, but to build resilient streams, exceptions need to be treated as first-class citizens, not just as extra features in the language that no one truly understands.
The pattern of completing an
Observable with one value and wrapping with the
try-catch statement is so prevalent that the built-in
fromCallable() operator was introduced:
It is semantically equivalent but much shorter and has some other benefits over
create() that you will discover later.
In later chapters, we will explore schedulers, but first let’s discover two very useful operators that use threads underneath:
interval(). The former simply creates an
Observable that emits a long value of zero after a specified delay and then completes:
As silly as it sounds,
timer() is extremely useful. It is basically an asynchronous equivalent of
Thread.sleep(). Rather than blocking the current thread, we create an
subscribe() to it. It will become significantly more important after we learn how to compose simple
Observables into more complex computations. The fixed value of
0 (in variable zero) is just a convention without any specific meaning. However, it makes more sense when
interval() is introduced.
interval() generates a sequence of long numbers, beginning with zero, with a fixed delay between each one of them:
Observable.interval() produces a sequence of consecutive long numbers, beginning with
0. However, unlike
interval() places a fixed delay before every event, including the first one.
After you get an instance of
Observable, it is important to understand whether the stream is hot or cold. The API and semantics remain the same, but the way you use
Observable will depend on the type.
Observable is entirely lazy and never begins to emit events until someone is actually interested. If there are no observers,
Observable is just a static data structure. This also implies that every subscriber receives its own copy of the stream because events are produced lazily but also not likely cached in any way.
Observables typically come from
Observable.create(), which idiomatically should not start any logic but instead postpone it until someone actually listens. A cold
Observable is thus somewhat dependent on Subscriber. Examples of cold
Observables, apart from
range(). Subscribing to a cold
Observable often involves a side effect happening inside
create(). For example, the database is queried or a connection is opened.
Observables are different. After you get a hold of such an
Observable it might already be emitting events no matter how many
Subscribers they have.
Observable pushes events downstream, even if no one listens and events are possibly missed.
Whereas typically you have full control over cold
Observables are independent from consumers. When a
Subscriber appears, a hot
Observable behaves like a wire tap, transparently publishing events flowing through it. The presence or absence of
Subscriber does not alter the behavior of
Observable; it is entirely decoupled and independent.
Observables typically occur when we have absolutely no control over the source of events. Examples of such
Observables include mouse movements, keyboard inputs, or button clicks. So far, we haven’t even mentioned the user interface, but it turns out that RxJava fits perfectly when implementing user interfaces. This library is especially appreciated in the Android community, where it helps in transforming from nested callbacks to flat composition of streams.
The importance of hot versus cold distinction becomes essential when we rely on delivery of events. No matter when you subscribe to a cold
Observable—immediately or after hours—you always receive a complete and consistent set of events. On the other hand, if the
Observable is hot, you can never be sure you received all events from the beginning.
Another interesting distinction that comes to mind between hot and cold sources is time dependency. A cold
Observable produces values on demand and possibly multiple times so the exact instant when an item was created is irrelevant. Conversely, hot
Observables represent events as they come, typically from some external source. This means that the instant when a given value was generated is very significant because it places the event on the timescale.
The most striking example of such an API is almost every graphical user interface library out there: for example, Swing. When various listeners like
onKeyUp() are used, callbacks are certainly inevitable. If you’ve worked in such environments, the term callback hell is definitely familiar to you. Callbacks have a tendency to nest in one another, so coordinating multiple callbacks is virtually impossible. Here is an example of a callback nested four times:
The simplest requirements, like reacting when two callbacks are invoked shortly after each other, becomes a nightmare, and is additionally hindered by multithreading. In this section, we will refactor a callback-based API into RxJava with all the benefits such as controlling threads, lifecycle, and cleanup.
For the purpose of this exercise, we will use the open source Twitter4J library that can push a subset of new tweets using a callback-based API. Twitter4J was chosen as a good example of an API using callbacks with an interesting domain. The simplest working example of reading tweets in real-time might look like this:
twitterStream.sample() starts a background thread that logs in to Twitter and awaits new messages. Every time a tweet appears, the
onStatus callback is executed. Execution can jump between threads, therefore we can no longer rely on throwing exceptions. Instead the
onException() notification is used.
Overall, it does not look that bad, the problem is that this program is not doing anything. In real life, you would probably process each
Status message (tweet) somehow. For example, save it to a database or feed a machine-learning algorithm. You can technically put that logic inside the callback, but this couples the infrastructural call with the business logic. Simple delegation to a separate class is better, but unfortunately not reusable. What we really want is clean separation between the technical domain (consuming data from an HTTP connection) and the business domain (interpreting input data). So we build a second layer of callbacks:
By adding this one extra level of abstraction we can now reuse the
consume() method in various ways. Imagine that instead of logging you have persistence, analytics, or fraud detection:
But we just shifted the problem up in the hierarchy. What if we want to count the number of tweets per second? Or consume just the first five? And what if we would like to have multiple listeners? In these situations, each of these situations opens a new HTTP connection. Last but not least, this API does not allow unsubscribing when we are done, risking resource leak.
We hope you realize that we are heading toward an Rx-powered API. Rather than passing callbacks down to the place where they can be executed, we can return an
Observable<Status> and let everyone subscribe whenever they want. However, keep in mind that the following implementation still opens a new network connection for each
At this point, we can simply call
observe(), which only creates an
Observable and does not contact the external server. We learned that unless someone actually subscribes, the contents of
create() are not executed. The subscription is very similar:
The big difference here, compared to
consume(...), is that we are not forced to pass callbacks as arguments to
observe(). Instead, we can return
Observable<Status>, pass it around, store it somewhere, and use it whenever and wherever we feel like it is needed. One important aspect that we have not covered is resource clean-up. When someone unsubscribes, we should shut down
TwitterStream to avoid resource leak. We already know two techniques for that; let’s use the simpler one first:
When someone subscribes only to receive a small fraction of the stream, our
Observable will make sure to clean up the resources. We know a second technique to implement clean-up that does not require waiting for an upstream event. The moment a subscriber unsubscribes, we call
shutdown() immediately, rather than waiting for the next tweet to come just to trigger clean-up behavior (last line):
Observable blurs the difference between hot and cold streams. On one hand, it represents external events that appear without our control (hot behavior). On the other hand, events will not begin flowing (no underlying HTTP connection) to our system until we actually
One more side effect that we forgot about is still creeping in: every new
subscribe() will start a new background thread and new connection to an external system. The same instance of
Observable<Status> should be reusable across many subscribers, and because
Observable is lazy, you should technically be able to call
observe() once upon startup and keep it in some singleton. But the current implementation simply opens a new connection, effectively fetching the same data multiple times from the network, for each
We certainly want to register multiple
Subscribers of that stream, but there is no reason why every
Subscriber is supposed to fetch the same data independently. What we really want is a pub-sub behavior wherein one publisher (external system) delivers data to multiple
Subscribers. In theory, the
cache() operator can do that, but we don’t want to buffer old events forever. We will now explore some solutions to this problem.
Manually keeping track of all subscribers and shutting down the connection to the external system only when all subscribers leave is a Sisyphean task that we will implement anyway, just to appreciate idiomatic solutions later on. The idea is to keep track of all subscribers in some sort of
Set<Subscriber<Status>> and start/shut down the external system connection when it becomes empty/nonempty:
subscribers set thread-safely stores a collection of currently subscribed
Observers. Every time a new
Subscriber appears, we add it to a set and connect to the underlying source of events lazily. Conversely, when the last
Subscriber disappears, we shut down the upstream source.
The key here is to always have exactly one connection to the upstream system rather than one connection per subscriber. This works and is quite robust, however, the implementation seems too low-level and error-prone. Access to the
subscribers set must be
synchronized, but the collection itself must also support safe iteration. Calling
register() must appear before adding the
deregister() callback; otherwise, the latter can be called before we register.
There must be a better way to implement such a common scenario of multiplexing a single upstream source to multiple Observers — luckily, there are at least two such mechanisms. RxJava is all about reducing such dangerous boilerplate and abstracting away concurrency.
Subject class is quite interesting because it extends
Observable and implements
Observer at the same time. What that means is that you can treat it as
Observable on the client side (subscribing to upstream events) and as
Observer on the provider side (pushing events downstream on demand by calling
onNext() on it).
Typically, what you do is keep a reference to
Subject internally so that you can push events from any source you like but externally expose this
Let’s reimplement streaming
Status updates using
Subject. To further simplify implementation, we connect to the external system eagerly and do not keep track of subscribers. Apart from simplifying our example, this has the benefit of smaller latency when the first
Subscriber appears. Events are already flowing, we don’t need to wait to reconnect to some third-party application:
PublishSubject is one of the flavors (subclasses) of Subject. We eagerly begin receiving events from the upstream system and simply push them (by calling
subject.onNext(...)) to all
Subjec keeps track of these events internally so that we no longer need to.
Notice how we simply return
observe(), pretending it is a plain
Observable. Now when someone subscribes, the
Subscriber will receive all subsequent events immediately after
onNext() is called on the backend — at least until it unsubscribes.
Subject is a useful tool for creating
Observable instances when
Observable.create(...) seems too complex to manage. Other types of
Subjects include the following:
Remembers last emitted value and pushes it to subscribers when onComplete() is called. As long as AsyncSubject has not completed, events except the last one are discarded.
Pushes all events emitted after subscription happened, just like
PublishSubject. However, first it emits the most recent event that occurred just before subscription. This allows
Subscriber to be immediately notified about the state of the stream. For example,
Subject may represent the current temperature broadcasted every minute. When a client subscribes, he will receive the last seen temperature immediately rather than waiting several seconds for the next event. But the same
Subscriber is not interested in historical temperatures, only the last one. If no events have yet been emitted, a special default event is pushed first (if provided).
The most interesting type of Subject that caches events pushed through the entire history. If someone subscribes, first he receives a batch of missed (cached) events and only later events in real-time. By default, all events since the creation of this
Subject are cached.
This can be become dangerous if the stream is infinite or very long. In that case, there are overloaded versions of
ReplaySubject that keep only the following:
- Configurable number of events in memory (
- Configurable time window of most recent events (
- Or even constraint both size and time (whichever limit is reached first) with
Subjects should be treated with caution: often there are more idiomatic ways of sharing subscriptions and caching events—for example, see
ConnectableObservable. For the time being, prefer relatively low-level
Observable.create() or even better, consider standard factory methods like
ConnectableObservable is an interesting way of coordinating multiple
Subscribers and sharing a single underlying subscription.
ConnectableObservable is a type of Observable that ensures there exists at most one Subscriber at all times, but in reality there can be many of them sharing the same underlying resource.
There are many applications of
ConnectableObservable; for example, making sure all Subscribers receive the same sequence of events regardless of when they subscribed.
ConnectableObservable can also force subscription if it generates important side effects, even when no “real”
Subscriber has appeared yet.
Subjects are imperative ways of creating
ConnectableObservable shields the original upstream
Observable and guarantees at most one
Subscriber reaches it. No matter how many
Subscribers connect to
ConnectableObservable, it opens just one subscription to the
Observable from which it was created.
Let us recap: we have a single handle to the underlying resource; for example, HTTP connection to stream of Twitter status updates. However, an
Observable pushing these events will be shared among multiple
Subscribers. The naive implementation of this
Observable created earlier had no control over this; therefore, each
Subscriber started its own connection. This is quite wasteful:
When we try to use this
Subscriber establishes a new connection, like so:
Here is the output:
This time, to simplify, we use a parameterless
subscribe() method that triggers subscription but drops all events and notifications. After spending almost half of the chapter fighting with this problem and familiarizing ourselves with plenty of RxJava features, we can finally introduce the most scalable and simplest solution: the
The output is much like what we expect:
The connection is not established until we actually get the first Subscriber. But, more important, the second
Subscriber does not initiate a new connection, it does not even touch the original
publish().refCount() tandem wrapped the underlying
Observable and intercepted all subscriptions.
refCount does is basically count how many active Subscribers we have at the moment, much like reference counting in historic garbage-collection algorithms. When this number goes from zero to one, it subscribes to the upstream
Observable. Every number above one is ignored and the same upstream
Subscriber is simply shared between all downstream
Subscribers. However, when the very last downstream
Subscriber unsubscribes, the counter drops from one to zero and
refCount() knows it must unsubscribe right away.
publish().refCount() is used very frequently and therefore has an alias named
share(). Keep in mind that if unsubscription is shortly followed by subscription,
share() still performs reconnection, as if there were no caching at all.
Another useful use case of the
publish() operator is forcing subscription in the absence of any
Subscriber. Imagine that we have our
Observable<Status>. Before we expose it to our clients we want to store each event in the database, regardless of whether someone is subscribed. A naive approach is not sufficient:
We are using the
doOnNext() operator that peeks every item that flows through the stream and performs some action, like
saveStatus(). However, remember that
Observables are lazy by design; therefore, as long as no one subscribed,
doOnNext() is not triggered.
What we want is a fake
Observer that does not really listen to events but forces upstream
Observables to produce events. There is actually an overloaded version of
subscribe() that does exactly this:
Subscriber in the end invokes
Observable.create() and connects to the upstream source of events. This seems to solve the problem, but we again forgot to protect ourselves from multiple subscribers. If we expose
tweets outside, the second subscriber will make a second attempt to connect to the external resource—for example, opening a second HTTP connection.
The idiomatic solution is to use
publish().connect() duet that creates an artificial
Subscriberimmediately while keeping just one upstream
Subscriber. This is best explained with an example.
Anyone who subscribes to
ConnectableObservable is placed in a set of
Subscribers. As long as
connect() is not called, these
Subscribers are put on hold, they never directly subscribe to upstream
Observable. However, when
connect() is called, a dedicated mediating
Subscriber subscribes to upstream
tweets), no matter how many downstream subscribers appeared before—even if there were none. But if there were some
ConnectableObservable put on hold, they will all receive the same sequence of notifications.
This mechanism has multiple advantages. Imagine that you have an
Observable in your application in which multiple
Subscribers are interested. On startup, several components (e.g., Spring beans or EJBs) subscribe to that
Observable and begin listening. Without
ConnectableObservable, it is very likely that hot
Observable will begin emitting events that will be consumed by the first
Subscribers started later will miss out on the early events. This can be a problem if you want to be absolutely sure that all
Subscribers receive a consistent view of the world. All of them will receive events in the same order, unfortunately
Subscriber appearing late will lose early notifications.
The solution to this problem is to
publish() such an
Observable first and make it possible for all of the components in your system to
subscribe(); for example, during application startup. When you are 100% sure that all
Subscribers that need to receive the same sequence of events (including initial event) had a chance to
subscribe(), connect such
connect(). This will create a single
Subscriber in upstream
Observable and begin pushing events to all downstream
Creating and subscribing to
Observable are essential features of RxJava. Especially beginners tend to forget about subscription and are surprised that no events are emitted. Many developers focus on amazing operators provided by this library, but failing to understand how these operators perform subscription underneath can cause subtle bugs.
Moreover, the asynchronous nature of RxJava is typically taken for granted, which is not really the case. As a matter of fact, most operators in RxJava do not use any particular thread pool. More precisely this means that by default no concurrency is involved whatsoever and everything happens in client thread. This is another important take away of this chapter. Now, when you understand subscription and concurrency principles, you are ready to begin using RxJava painlessly and effectively.
(To Be Continued)