So far, we’ve become very familiar with the push-based nature of RxJava. Events are produced somewhere up in the stream to be consumed later by all subscribers. We never really paid much attention to what happens if Observer is slow and cannot keep up with events emitted from within
Observable.create(). This entire chapter is devoted to this problem.
RxJava has two ways of dealing with producers being more active than subscribers:
- Various flow-control mechanisms such as sampling and batching are implemented via built-in operators
- Subscribers can propagate their demand and request only as many items as they can process by using a feedback channel known as backpressure
Before RxJava began implementing backpressure (see the section “Backpressure”), dealing with producers (
Observables) outperforming consumers (
Observers) was a difficult task. There are quite a few operators that were invented to deal with producers pushing too many events, and most of them are quite interesting on their own. Some are useful for batching events; others are dropping some events. This section walks you through these operators, including some examples.
There are cases for which you definitely want to receive and process every single event pushed from the upstream
Observable. But, there are some scenarios for which periodic sampling is enough. The most obvious case is receiving measurements from some device; for example, temperature. The frequency at which the device produces new measurements is often irrelevant for us, especially when the measurements appear often but are very similar to one another. The
sample() operator looks at the upstream
Observable periodically (for example, every second) and emits the last encountered event. If there were no event at all in the last one-second period, no sample is forwarded downstream and the next sample will be taken after one second, as illustrated in this sample:
The preceding code snippet will print something similar to the following:
The first column shows relative time from subscription to sample emission. You can clearly see that the first sample appears a little bit over one second (as requested by the
sample() operator) and subsequent samples are roughly one second after one another. More important, notice what the values are. The
interval() operator emits natural numbers starting from zero every seven milliseconds. Thus, by the time the first sample is taken, we can expect about 142 (1,000/7) events to appear, where 142nd value is 141 (0-based).
Let’s explore a sample that’s a little bit more complex. Imagine that you have a list of names that appear with some absolute delays, like so:
First, we construct a sequence of names followed by a sequences of absolute delays (in seconds, later mapped to milliseconds). Using the
zipWith() operator, we
delay() the occurrence of certain names. The
sample() operator will periodically (every second) pick the last seen name from the stream within the last period. So, after the first second, we println Linda, followed by Barbara a second later. Now between 2,000 and 3,000 milliseconds since subscription, no name appeared, so
sample() does not emit anything. Two seconds after Barbara was emitted, we see Susan.
sample() will forward completion (and errors, as well) discarding the last period. If we want to see Dorothy appearing as well, we can artificially postpone the completion notification, as is done here:
sample() has a more advanced variant taking
Observable as an argument rather than a fixed period. This second
Observable (known as sampler) basically dictates when to take a sample from the upstream source: every time sampler emits any value, a new sample is taken (if any new value appeared since the last sample). You can use this overloaded version of
sample() to dynamically change the sampling rate or take samples only at very specific points in time. For example, taking a snapshot of some value when a new frame is redrawn or when a key is pressed. A trivial example can simply emulate the fixed period by using the
As you can see, there are some subtleties regarding
sample()’s behavior. Rather than relying on our understanding of documentation or manual verification, it is great to have automated tests. Testing time-sensitive operators like
sample() is covered in the section “Virtual Time”.
sample() has an alias in RxJava called
throttleLast(). Symmetrically, there is also the
throttleFirst() operator that emits the very first event that appeared in each period. So, applying
throttleFirst() instead of
sample() in our name stream yields rather expected results:
The output looks like this:
throttleFirst() does not emit any event when no new name appeared between Barbara and Elizabeth.
Buffering and moving windows are among the most exciting built-in operators offered by RxJava. They both traverse input stream through a window that captures several consecutive elements and moves forward. On one hand, they allow batching values from an upstream source to handle them more effectively. In practice, they are flexible and versatile tools that allow various aggregations of data on the fly.
buffer() operator aggregates batches of events in real time into a List. However, unlike the
buffer() emits several lists grouping some number of subsequent events as opposed to just one containing all events (like
toList()). The simplest form of
buffer() groups values from upstream
Observable into a lists of equal size:
subscribe(System.out::println) would work as well; we left the type information for educational purposes. The output shows three events emitted from the
buffer() keeps receiving upstream events and buffers them (hence the name) internally until the buffer reaches a size of 3. When that happens, the entire buffer (
List<Integer>) is pushed downstream. When the completion notification appears and internal buffer was not empty (but not yet of size 3), it is pushed downstream anyway. That is the reason we see a one-element list in the end.
By using the
buffer(int) operator you can replace several fine-grained events with less but bigger batches. For example, if you want to reduce database load, you might want to replace storing each event individually by storing them in batches:
The latter subscription calls
storeAll on Repository, storing batches of 10 elements at once. This can potentially improve throughput in your application.
buffer() has many overloaded variants. A slightly more complex version allows you to configure how many oldest values from internal buffer to drop when
buffer() pushes the list downstream. That sounds complex, but put in more basic terms, it makes it possible for you to look at your event stream through the moving window of a certain size:
This yields several overlapping lists:
You can use
buffer(N, 1) variant if you want to compute a moving average of some time-series data. The code example that follows generates 1,000 random values from normal distribution. Later, we take a sliding window of 100 elements (advancing one element at a time) and compute the average of such a window. Run this program yourself and notice how the moving average is much smoother than the random unordered values.
You can probably imagine that calling
buffer(N) is in fact equivalent to
buffer(N, N). The simplest form of
buffer() drops the entire internal buffer when it becomes full. Interestingly, the second parameter of
buffer(int, int) (that specifies how many elements to skip when the buffer is pushed downstream) can be bigger than the first argument, effectively skipping some elements!
This setup forwards the first element but then skips two: the first and the second one. Then the cycle repeats: buffer() forwards the third element but then skips the third and fourth. Effectively, the output is:
   . Notice that each element in the odd Observable is actually a one-element list. You can use
flatMapIterable() to get back a simple
flatMapIterable() expects a function that transforms each value in the stream (one-element
List<Integer>) into a
List. Identity transformation (
list -> list) is enough here.
buffer() is actually a broad family of operators. Rather than batching upstream events based on size (so that each batch has the same size), another variant of
buffer() batches events by time period. While
throttleLast() were taking first and last events within a given period of time accordingly, one of the overloaded versions of buffer batches all events in each time period. Coming back to our names example:
An overloaded version of
buffer() that accepts time period (one second in the preceding example) aggregates all upstream events within that period. Therefore,
buffer() collects all events that happened during first time period, second time period, and so on:
List<String> is empty because no events appeared in that time frame. One of the use cases for
buffer() is counting the number of events per each time period; for example, number of key events per second:
Luckily, because no events within a one-second period yields an empty list, we do not have gaps in our measurements. However, this is not the most efficient way, as we will soon discover with the
The most comprehensive overload of
buffer() allows you to take full control over when this operator begins buffering events and when the buffer should be flushed downstream. In other words, you choose in which periods of time upstream events should be grouped. Imagine that you are monitoring some industrial device that pushes telemetric data very often. The amount of data is overwhelming, so to save some computational capacity, you decided to look only at certain samples. The algorithm follows:
- During business hours (9:00–17:00), we take 100-millisecond long snapshots every second (processing approximately 10% of data)
- Outside business hours we look only at 200-millisecond long snapshots taken every 5 seconds (4%)
In other words, once every second (or 5 seconds) we buffer all events for 100 milliseconds (or 200 accordingly) and emit lists of all of the events within that period. This will become clear when you see the entire example.
First, we need an
Observable that emits any value whenever we want to begin buffering (grouping) upstream events. This
Observable can literally push any type of value, but this is irrelevant because only timing matters. The fact that we are returning
Duration from the
java.time package is a coincidence, RxJava does not use this value in any way:
First using the
interval() operator we generate timer ticks every second but exclude those that are not within business hours. This way, we get a steady clock ticking every second between 9:00 and 17:00. Recall that
interval() returns growing natural
Long numbers; however, we do not need them, so for convenience we replace them with fixed duration of 100 milliseconds. Symmetrical code creates a steady stream of events every 5 seconds between 17:00 and 9:00. If you are curious how
isBusinessHour() is implemented, it uses the
openings stream merges together
outsideBusinessHours streams. It is basically a trigger that instructs the
buffer() operator when to begin collecting samples from upstream rather than discarding them. Whatever the value is emitted from the openings stream is entirely irrelevant. But we must also specify when to stop aggregating (buffering) events and push them downstream as one batch in a
List. The most obvious solution is to treat each event emitted from
openings stream as a signal to stop the current batch, emit it downstream, and start another batch:
Notice how we pass carefully crafted
openings stream to the
buffer() operator. The preceding code example slices the upstream source of
TeleData values. The ticking clock of openings stream batches events from upstream. Within business hours, a new batch is created every second, outside business hours, batches group values in five-second periods. Importantly in this version, all events from upstream are preserved because they either land in one batch or the other. However, an overloaded version of the
buffer() operator also allows marking the end of a batch:
First recall that
openings is an
Observable<Duration>, but the actual value of events from
openings is not important. RxJava merely uses this event to start buffering TeleData instances. But this time we have full control when buffering and emission of this buffer should occur. The second parameter is an
Observable that must complete whenever we want to stop sampling. Completion of this second stream marks the end of a given batch.
Look carefully: the
openings stream emits an event every time we would like to start a new batch. For each event emitted from
openings we return a new
Observable that should complete some time in the future. So, for example, when the
openings stream emits an event of
Duation.ofMillis(100) value, we transform it to an
Observable that completes when a given batch should end, after 100 milliseconds.
Notice that in this case some events might be dropped or duplicated in consecutive batches. If second, the
Observable — which is responsible for marking the end of a given batch, appears before an opening event of the next batch, within this time gap events are discarded by
buffer(). This is our case: we begin buffering events every second (or every other second outside business hours), but the buffer closes and is being forwarded after 100 milliseconds (or 200, accordingly). The majority of events fall between buffering periods and therefore are discarded.
buffer() operator is extremely flexible and quite complex. Make sure that you experiment a little bit with it and understand the preceding example. It is used to smartly batch events from the upstream source to achieve grouping, sampling, or moving window functionality. But because
buffer() requires creating an intermediate
List before the current buffer is closed and passed downstream, it might unnecessarily put pressure on garbage collection and memory usage. Therefore, the
window() operator was introduced.
When working with
buffer() we build List instances over and over. Why do we build these intermediate
Lists rather then somehow consume events on the fly? This is where the
window() operator becomes useful. You should prefer
buffer() if possible because the latter is less predictable in terms of memory usage. The
window() operator is very similar to
buffer(): it has similar overloaded versions including the one that does the following:
int, grouping events from source into fixed-size lists
- Receive time unit, grouping events within fixed-time periods
- Receive custom
Observables marking the beginning and end of each batch
What is the difference, then? Remember the example that was counting how many events occurred per second in given source? Let’s take another look at it:
We batch all events from
Observable<KeyEvent> that occurred in each second into
Observable<List<KeyEvent>>. In the next step, we map
List into its size. This is quite wasteful, especially if the number of events in each second is significant:
window(), as opposed to
buffer(), returns an
Observable<Observable<KeyEvent>>. Think about it for a moment. Rather than receiving fixed lists with each one containing one batch (or buffer), we receive a stream of streams. Every time a new batch begins (every one second in the preceding example), a new
Observable<KeyEvent> value appears in the outer stream. We can further transform all of these inner streams, but to avoid double-wrapping we use
flatMap() receives each buffer (an
Observable<KeyEvent>) as an argument and is suppose to return another
count() operator transforms an
Observable<T> into an
Observable<Integer> that emits just one item representing the number of events in the original
Observable. Therefore, for each one-second batch we produce, the number of events occurred within that second. But there is no internal buffering; the
count() operator counts events on the fly as they pass through.
window() group several events together so that you can process them in batches.
sample() picks one fairly arbitrary event once in a while. These operators do not take into account how much time elapsed between events. But in many cases, the event can be discarded if it is shortly followed by another event. For example, imagine a stream of stock prices flowing from a trading platform:
throttleWithTimeout()) discards all of the events that are shortly followed by another event. In other words, if a given event is not followed by another event within a time window, that event is emitted.
In the preceding example, the prices stream pushes prices of
"NFLX" stock every time they change. Prices sometimes change very frequently, dozens of times per second. For each price change we would like to run some computation that takes a significant amount of time to complete. However, if a new price arrives, the result of this computation is irrelevant; it must begin from scratch with this new price. Therefore, we would like to discard events if they are followed (suppressed by) a new event shortly after.
debounce() waits a little bit (100 milliseconds in the preceding example) just in case second event appears later on. This process repeats itself so that if a second event appears in less than 100 milliseconds from the first one, RxJava will postpone its emission, hoping for the third one to appear. This time, again, you have an option to flexibly control for how long to wait on a per-event basis.
For example, you might want to ignore stock price changes if they are followed by an update in less than 100 milliseconds. However, if the price goes above $150, we would like to forward such an update downstream much faster without hesitation. Maybe because some types of events need to be handled straight away; for example, because they are great market opportunities. you can implement this easily by using an overloaded version of
For each new update of price
x, we apply sophisticated logic (
> $150) to figure out if the price is good. Then, for each such update we return a unique
Observable, which is empty. It does not need to emit any items; it is important when it completes. For good prices, it emits a completion notification after 10 milliseconds. For other prices, this
Observable completes after 100 milliseconds. The
debounce() operator for each event it receives subscribes to this
Observable waiting for its completion. If it completes first, the event is passed downstream. Otherwise, if more recent upstream event appeared in the meantime, the cycle repeats.
In our example, when a price x of $140 appears, the
debounce() operator creates a new
Observable with completion delayed by 100 milliseconds via the expression we provided. If no events appear before the completion of this event, the $140 event will be forwarded downstream. However, imagine another price update x of $151 came along. This time when the
debounce() operator asks us to provide an
debounceSelector in the API) we return a stream that completes much faster, after 10 milliseconds. So in case of good prices (greater than $150), we are willing to wait only 10 milliseconds for a subsequent update. If you still struggle to understand how
debounce() works, here is a stock price simulator you can try:
The preceding code nicely composes several streams. First, we generate a sequence of long values emitted in fixed 50-millisecond intervals. Then, we delay each event independently by some random value between 0 and 100 milliseconds. Last but not least, we transform infinitely growing long numbers into a sine wave (using
Math.sin()) with random jitter. This simulates stock price fluctuation over time.
If you run this stream against the
debounce() operator, you will notice that as long as prices are low, events are generally infrequent because we are willing to wait as much as 100 milliseconds for subsequent event, which often occurs. But when the price goes above $150, the
debounce() tolerance goes down to 10 milliseconds, so effectively every good price update is forwarded downstream.
It is quite easy to imagine a situation in which the
debounce() operator prevents emission of all events because they simply appear too often and there is never a moment of silence:
Such a source will never emit any event because
debounce() waits as much as 100 milliseconds to wnsure that there is no more recent event. Unfortunately, just 1 millisecond before this timeout, a new event appears, starting
debounce’s timer all over. This leads to an
Observable that produces events so often that we may never get to see any of them (!) You can call this a feature, but in practice you might want to see some event from time to time, even in case of flood. To prevent such a situation we must get a little bit creative.
First, we must discover a situation in which no new event appeared for a long time. We already played with
timeout() operator in the section “Timing Out When Events Do Not Occur”, so we know that part is easy:
Now, we at least get an exception signaling an idle upstream source. Bizarrely it is the opposite — the upstream
interval() operator produces events too often and because of that,
debounce() never passes them downstream — but we digress. If events appear too often, we hold them back waiting for a moment of silence. But if this silence is too long (more than one second), we fail and throw a
TimeoutException. Rather than failing permanently, we would actually like to see an arbitrary value from upstream
Observable and continue. The first part of the task is simple:
timeout() operator has an overloaded version that accepts a fallback
Observable upon timeout. Unfortunately, there is a subtle bug here. In case of timeout, we naively take the first encountered item from upstream and then complete. What we really want is to continue emitting events from upstream, still with
Another approach is seemingly better:
It looks somewhat OK at first sight. The original source, after applying
debounce() has a timeout. When timeout occurs, we emit the very first item we encountered and continue with the same source, also by using the
debounce() operator. However, in the case of a first timeout, we switch to the fallback
Observable that no longer has a
timeout() operator applied. A quick, dirty, and short-sighted fix:
Yet again, we forgot to put a fallback
Observable in the inner
timeout() operator. Enough is enough, you should already see a recurring pattern here. Rather than infinitely repeating the same form of
upstream → …we can use recursion!
The definition of onTimeout fallback
timedDebounce is tricky. We declare that it first takes one sample event from upstream (which is the original source) followed by a recursively invoked
timedDebounce() method. We must use the
defer() operator to avoid infinite recursion. The rest of the
timedDebounce() basically takes the original upstream source, applies the
debounce() operator, and adds fallback onTimeout. This fallback does the exact same thing: applies
debounce(), adds a
timeout(), and fallback — recursively.
Do not become depressed if you find it difficult to grasp at first. This is a rather complex example showing the power of stream composition together with laziness and recursion. You hardly ever need that level of complexity, but after you grasp how it works, it is quite satisfying. Play around with this code and observe how tiny changes drastically alter the way streams interact with one another.
Backpressure is quite essential to build robust and responsive applications. In essence, it is a feedback channel from the consumer to producer. The consumer has a certain level of control over how much data it can process at any time. This way consumers or messaging middleware are not becoming saturated and unresponsive under high load. Instead, they request fewer messages, letting the producer decide how to slow down.
In every system that exchanges data via message passing (or events for that matter), a problem of consumers not keeping up with producers can arise. Depending on the underlying implementation, it can manifest in different ways. If the communication channel somehow synchronizes producers and consumers (for example, by using
ArrayBlockingQueue), the producer is throttled (blocked) when the consumer is not keeping up with the load.
This leads to coupling between the producer and consumer which should otherwise be entirely independent. Message passing typically means asynchronous processing, an assumption that fails when the producer suddenly must wait for the consumer. Even worse, the producer might be a consumer of a different producer higher in the hierarchy, cascading increased latency up.
Conversely, if the medium in between these two parties is unbounded, well… it is still bound by factors over which we have less control. An infinite queue like
LinkedBlockingQueue allows the producer to greatly outperform consumers without blocking. That is, until
LinkedBlockingQueue does not consume all memory and crash entire application.
If the medium is persistent—for example a JMS message broker—the same problem can technically manifest as well with disk space, but this is less probable. Far more common is a situation in which the messaging middleware finds it difficult to manage thousands if not millions of unconsumed messages. Some specialized brokers such as Kafka can technically store hundreds of millions of messages until lagging consumers get a hold of them. But this leads to an enormous increase in latencies, measured as a time between message production and consumption.
Although message-driven systems are generally considered more robust and scalable, the problem of too eager producers remains unsolved. However, there are some efforts to solve this integration issue. Sampling and throttling (using
sample() and others) and batching (using
buffer()) are manual ways of reducing producer load in RxJava.
Observable generates more events that can be consumed, we can apply sampling or batching to increase subscription throughput. Yet, more robust and systematic approach was needed, hence the Reactive Streams initiative was born out of necessity. This small set of interfaces and semantics aims to formalize the problem and provide a systematic algorithm for producer–consumer coordination, known as backpressure.
Backpressure is a simple protocol that allows the consumer to request how much data it can consume at a time, effectively providing a feedback channel to a producer. Producers receive requests from consumers, avoiding message overflow. Of course, this algorithm can work only with producers that are capable of throttling themselves; for example, when they are backed by static collection or source that can be pulled from something like
Iterator. When the producer has no control over the frequency of data it produces (the source is external or hot), backpressure can not help much.
Even though Reactive Streams solve a very general problem in technology-agnostic way, we will focus on RxJava and how it approaches the problem of backpressure. Throughout this chapter we will use an example of continually washing dishes in a small restaurant. Dishes are modeled as large objects with an identifier:
oneKb buffer simulates some extra memory utilization. Dishes are passed to the kitchen by waiters and are modeled as an
range() operator produces new values as fast as it possibly can. So what happens if washing dishes takes a little bit of time and is clearly slower than the pace of production?
Surprisingly nothing bad. If you study the output you will notice that range() is perfectly aligned with subscription:
This should not come as a surprise to you. The
range() operator is not asynchronous by default, so every item it produces is passed to a
Subscriber directly within the context of the same thread. If the
Subscriber is slow, it effectively prevents Observable from producing more elements.
range() cannot call
onNext() of the
Subscriber until the previous one finished. This is possible because both producer and consumer work in the same thread and are transparently coupled. In some sense, there is an implicit queue between them with a maximum capacity of one.
A rendezvous algorithm that we did not anticipate. Imagine a waiter in a restaurant who cannot leave new dishes for cleaning as long as the ones currently being washed are not done. But when a waiter stands still waiting for dish washing to be done, customers are not served. And when they are not served, new customers cannot enter the restaurant. This is how one blocking component can bring the entire system to a stall. However, in real life there is typically a thread boundary between producer and consumer:
Observable produces events in one thread, whereas
Subscriber consumes in another:
Stop for a moment and think about what could happen without actually compiling and running the code. One might think that a disaster should occur because
dishes produces events very fast from the
range() operator, whereas
Subscriber is quite slow, consuming only 20 dishes per second.
observeOn() operator keeps consuming events in quick succession but the Subscriber is consuming them way to slow. Therefore you might conclude that
OutOfMemoryError is unavoidable with unprocessed events piling up somewhere. Luckily backpressure saves the day in this case and RxJava protects us to some degree. The output of the program is somewhat unexpected:
First, a batch of 128 dishes is being produced by
range() pretty much instantaneously. Later, there is a slow process of washing dishes, one by one. Somehow the
range() operator becomes idle. When the last dish out of these 128 is washed, another batch of 96 dishes is produced by
range(), followed by a slow process of washing.
Apparently, there must be some clever mechanism that prevents
range() from producing too many events, controlled by subscriber. If you do not see where such mechanism is deployed, let’s try to implement
Here, we’re using
myRange() in the same example together with
This ends with catastrophe, and we never even get to wash any dish:
MissingBackpressureException will be explained later on. For the time being, I’m guessing that this convinces you that there is some background mechanism that our custom implementation of
range() is lacking.
For the past several chapters we watched how events were flowing downstream from the source
Observable, through a sequence of operators, down to a
Subscriber. There was never any feedback channel past a subscription request. The moment we invoked
subscribe() (which in some sense propagates up) all events and notifications are traveling down without any apparent feedback loop. This lack of feedback can lead to producers (uppermost
Observable) emitting a number of events overwhelming the subscriber. As a consequence, your application can crash with
OutOfMemoryError or at best become very latent.
Backpressure is a mechanism that allows terminal subscribers as well as all intermediate operators to request only a certain number of events from the producer. By default, an upstream cold
Observable produces events as fast it can. But in the presence of such requests coming from downstream, it should in a way “slow down” and produce exactly the number requested. This is the reason behind the magic number of 128 seen with
observeOn(). But, first let’s see how the final subscriber can control backpressure.
When subscribing, we have a possibility to implement
onError(). Turns out there is another callback method to implement:
onStart() is invoked by RxJava exactly when you think it should — before any event or notification is propagated to
Subscriber. You can technically use a constructor of your
Subscriber, but for anonymous inner classes in Java, constructors look really eerie:
But we digress. The
request(3) invocation inside a
Subscriber instructs the upstream source how many items we are willing to receive at first. Skipping this invocation entirely (or calling
request(Long.MAX_VALUE)) is equivalent to requesting as many events as possible.
This is the reason why we must invoke
request() very early; otherwise, the stream begins to emit events and we cannot decrease our demand later on. But when we request only three events, the
range() operator will obediently stop emitting events temporarily after pushing 1, 2, and 3. Our
onNext() callback method will be invoked three times and no more, despite
range() operator not being completed yet. However, we, as a
Subscriber, have full control over how much data we want to receive. For example, we might want to request items individually:
This example is a bit silly because it behaves just like an ordinary
Subscriber without any backpressure whatsoever. But it illustrates how you can use backpressure. You can probably imagine a
Subscriber that prebuffers some number of events and then requests chunks when it finds it convenient.
Subscriber might decide that it wants to wait a little bit before receiving more events, despite being idle, for example to reduce stress on some downstream dependency. In our restaurant example, the waiter is an
Observable<Dish> that keeps pushing new dirty dishes whereas
request(N) is a readiness of the kitchen staff to wash a certain number of dishes. A good waiter should not deliver new dishes without a request from the kitchen’s staff.
That being said, calling
request(N) directly in client code is rare. More often, the various operators that we put between source and final Subscriber take advantage of backpressure to control how much data flows through our pipeline. For example
observeOn() must subscribe to the upstream
Observable and schedule each event it receives on a particular
Scheduler, such as
io(). But what if upstream produces events at such a pace that the underlying
Subscriber can no longer keep up? The
Subscriber that is being created by the
observeOn() operator is backpressure-enabled, it requests only 128 values to begin with.
Observable, which understands backpressure, emits only a given number of events and remains idle — this is what
range() does, for example. When
observeOn() finds that this batch of events was successfully processed by downstram
Subscriber, it requests more. This way, despite crossing a thread boundary and the asynchronous nature of both producer and consumer side, the consumer is never flooded with events.
observeOn() is not the only operator that is backpressure friendly. As a matter of fact, dozens of other operators take advantage of it. For example,
zip() buffers only a fixed number of events from each underlying
Observable. Thanks to this
zip() is not affected in case of only one of zipped streams being very active. The same logic applies to most of the operators we use.
We already came across a
MissingBackpressureException in our custom implementation of
range(). What does it actually mean and how do you interpret this exception? Imagine a
Subscriber (yours but more often the one create by some operator) that knows exactly how many items it wants to receive; for example,
Another example of such an operator is
observeOn(). It must be very strict in that regard, if upstream
Observable pushes more items for some reason, the internal buffer inside
observeOn() overflows and it is signaled with
MissingBackpressureException. But why does an upstream
Observable push more items than requested? Well, because it simply ignores the
request() invocations. Let’s revisit our simple
The only way to stop it is by unsubscribing, but we do not want to unsubscribe, just slow it down a little bit. Downstream operators know precisely how many events they want to receive, but our source ignores these requests. The low-level mechanism for honoring the requested number of events is implemented via the rx.Producer. This interface is plugged in within
create(). To recap,
OnSubscribeRange is a callback that is executed every time someone subscribes to this
Observable. Normally, you would see calling
onNext() directly from within this interface, but not when backpressure is taken into account:
This is the skeleton of code you will find in RxJava’s implementation of
range(). Implementing Producer is quite a challenging task: it must be stateful, thread-safe, and extremely fast. Thus, we do not normally implement producers ourselves but it is useful to understand how they work. Backpressure internally turns Rx principles upside down.
Observable produced by
range() (and many other built-in operators) no longer pushes data eagerly to
Subscribers. Instead, it wakes up and reacts on data requests (
request(N) invocations within
Subscriber) and only then produces events. Also, it makes sure not to produce more than was requested.
Look how we set a
Producer on child
Subscriber — this
Producer will later be invoked indirectly within
Subscriber whenever it calls
request(). This is how we set up a feedback channel from
Subscriber to the source
Observable instructs its
Subscriber how it can request certain amount of data. Effectively
Observable switches from push to pull–push model, where clients can optionally request only limited number of events.
So, what to do if some foreign
Observable does not set up such a channel? Well, when RxJava discovers that it’s dealing with a source that does not support backpressure, it can fail with
MissingBackpressureException at any time. However, there are operators from the
onBackpressure*() family that can simulate backpressure to some extent.
onBackpressureBuffer() operator unconditionally buffers all upstream events and serves only the requested amount of data to downstream subscribers:
As always, reading from bottom to top: first
subscribe() propagates up to the
observeOn() must subscribe, as well, but it cannot simply begin consuming arbitrary number of events. Thus, it requests only a fixed number at the beginning (128) to avoid overflow of the
io() Scheduler’s queue. The
onBackpressureBuffer() operator acts as a guard against sources ignoring backpressure. When it receives
request(128) from the downstream
Subscriber, it passes the request up and does nothing if only 128 flow through it. But, in the event that the
Observable that ignored that request and simply pushed data irrespective to backpressure,
onBackpressureBuffer() keeps an unbounded buffer internally.
When another request comes from a downstream
onBackpressureBuffer() first drains its internal buffer, and only when it is almost empty does it ask upstream for more. This clever mechanism allows
observeOn() to work as if
myRange() was backpressure-enabled, whereas in reality it is
onBackpressureBuffer() that does the throttling. Unfortunately, infinite internal buffer is not something that you can treat lightheartedly:
Of course, your mileage may vary, and with smaller events and sufficient amount of memory,
onBackpressureBuffer() can technically work. But in reality, you should never rely on unbounded resources. Neither memory nor your solid state drive are inifnite. Luckily there is an overloaded version of
onBackpressureBuffer(N) that accepts the maximum buffer size:
The second parameter is optional; it is a callback invoked when the bounded buffer of 1,000 elements is full — when despite buffering
Subscriber still cannot process events at a satisfying pace. It does not allow any recovery, so expect
MissingBackpressureException immediately following the warning message. We do at least we have control over the buffer, but not the limits of the hardware or operating system.
An alternative to
onBackpressureDrop(), which simply discards all events that appeared without prior request(). Imagine a waiter in a restaurant who keeps delivering new dishes to the kitchen.
onBackpressureBuffer() is a finite/infinite table with dishes waiting to be washed.
onBackpressureDrop(), on the other hand, is a waiter who simply throws away dirty dishes if there is no washing capacity at the moment. This is not a very sustainable business model, but at least the restaurant can keep serving clients:
The callback is optional and it notifies us every time an event had to be discarded because it appeared without being requested. It is a good idea to keep track of how many events we dropped; this can be an important metric. Finally, there is
onBackpressureLatest() which is quite similar to
onBackpressureDrop(), but keeps a reference to the very last dropped element so that in case of a late
request() from downstream, the last seen value from upstream is served.
onBackpressure*() family of methods is used to bridge between operators and subscribers requesting backpressure and
Observables that are not supporting it. However, it is better to either use or create sources that support it natively.
There are many ways to construct an
Observable that supports downstream backpressure requests. The easiest solution is to use built-in factory methods like
from(Iterable<T>). The latter creates a source backed by
Iterable but with backpressure built-in. This means that such an
Observable will not emit all values from
Iterable at once; rather, it will do so gradually as requests are flowing from consumers. Note that this does not imply loading all data to
Iterable is basically a factory of
Iterators, so we can safely load data on the fly.
An interesting example of a backpressure-enabled
Observable is wrapping
ResultSet from JDBC onto a stream. Notice that
ResultSet is pull-based, just like the backpressure-enabled
Observable. But it is not an
Iterator, so we must first convert it to
Object is a loosely-typed representation of a single row from a database:
The preceding converter is a very simplified version without error handling, extracted from
ResultSetIterator, as found in Apache Commons DbUtils open source utility library. This class also provides a simplistic conversion to
Having all of these converters in place, we can finally build
Observable<Object> backed by
ResultSet with backpressure support:
Observable supports backpressure out of the box because the built-in
from() operator supports it. Therefore, the throughput of
Subscriber is not relevant anymore and we will no longer see
MissingBackpressureException. Notice that
setFetchSize() is necessary; otherwise, some JDBC drivers might try to load all records into memory, quite inefficient if we want to stream over a large result set.
As we already mentioned, the low-level mechanism for supporting backpressure is a custom implementation of Producer. However, this task is quite error-prone, thus a helper class was created, namely
SyncOnSubscribe. This implementation of
Observable.OnSubscribe is pull-based and has backpressure transparently built in.
Let’s begin from the simplest case of stateless
Observable — which is hardly ever found in real life. This type of Observable does not hold any state in between
onNext() invocations. But even the simplest
just() must remember which items were already emitted. One of the few useful
Observables without state emits random numbers:
Observable is an ordinary
Observable that you can transform, combine, and subscribe to. But underneath, it has full-fledged backpressure support. If Subscriber or any other operator in the pipeline requests a limited number of events, this
Observable will correctly obey the orders. The only thing we must provide to
createStateless() is a lambda expression that is invoked for each requested event; so if downstream calls
request(3), this custom expression is invoked three times, assuming that each invocation emits just one event. There is no context (state) in between invocations, thus it is called stateless.
Now let’s build a stateful operator. This variation of
SyncOnSubscribe allows an immutable state variable that is passed between invocations. Also, each invocation must return a new state value. As an example, we will build an unbounded generator of natural numbers, beginning at zero. Such an operator is actually quite useful if you want to zip an arbitrarily long sequence with monotonically increasing natural numbers.
range() will work as well, but it requires providing an upper limit, which is not always practical:
This time we provide two lambda expressions to the
createStateful() factory method. The first lazily creates initial state — zero in this case. The second expression is more important: it is supposed to push one item downstream somehow based on current state and return new state value. The state is expected to be immutable, thus this method allows returning a new state as opposed to mutating it. You can easily rewrite naturals
Observable so that it returns
BigInteger instead and prevents hypothetical overflow. This
Observable can produce an infinite number of increasing natural numbers, but fully supports backpressure. This means that it can adjust the speed at which it produces events based on
Subscribers preferences. Compare this to naive implementation that is undeniably much simpler, but falls short in the case of slow
If you prefer a single state variable that mutates while you traverse it (like
ResultSet from JDBC),
SyncOnSubscribe has a method for you as well. The following code does not compile due to checked exceptions, but we want to first highlight the overall usage pattern:
There are three callbacks to implement:
- Generator of state. This lambda is invoked once to produce state variable that will be passed as an argument to subsequent expressions
- Callback to generate next value, typically based on state. This callback is free to mutate state given as the first argument
- Third callback is invoked on unsubscription. This is the place to clean up
The more complete implementation with error handling looks as follows. Note that errors occurring during unsubscription are really difficult to propagate properly downstream:
SyncOnSubscribe is a handy utility that allows you to write backpressure-enabled
Observables. It is slightly more complex compared to
Observable.create(), but the benefits of backpressure controlled by each Subscriber are difficult to underestimate. You should avoid using the
create() operator directly and instead consider built-in factories like
Backpressure is an amazingly powerful mechanism for controlled throttling of
Subscribers. The feedback channel obviously brings some overhead, but the advantages of loosely coupled yet managed producers and consumers are enormous. Backpressure is often batched, so the overhead is minimal, but if
Subscriber is really slow (even briefly), this slowness is immediately reflected and the overall system stability is preserved. Missing backpressure can be mitigated to some extent by using the
onBackpressure*() family of methods, but not on the long term.
When creating your
Observables, think about correctly handling the backpressure requests. After all, you have no control over the throughput of
Subscribers. Another technique is to avoid the heavyweight work in
Subscriber, instead off-loading it to
flatMap(). For example, rather than storing events in a database within
subscribe() try doing this:
Consider making store more reactive (let it return
Observable<UUID> of saved record) and subscribing only to trigger subscription and side-effects:
Or even further, batch UUIDs to reduce logging framework overhead:
By avoiding long-running work in
subscribe() we reduce the need for backpressure, but it is still a good idea to think about it in advance. Consult JavaDocs for an indication as to whether the operator supports backpressure or not. If such information is missing, most likely the operator is not affected by backpressure in any way, like
One important takeaway from this chapter is to avoid
Observable.create() and manually emitting events. If you must implement
Observable yourself, consider the many factory methods that support backpressure for you. Also, pay attention to your domain, maybe you can safely skip or batch incoming events to reduce overall load on the consuming side.