Reactive Programming with RxJava: Flow Control and Backpressure

So far, we’ve become very familiar with the push-based nature of RxJava. Events are produced somewhere up in the stream to be consumed later by all subscribers. We never really paid much attention to what happens if Observer is slow and cannot keep up with events emitted from within Observable.create(). This entire chapter is devoted to this problem.

RxJava has two ways of dealing with producers being more active than subscribers:

  • Various flow-control mechanisms such as sampling and batching are implemented via built-in operators
  • Subscribers can propagate their demand and request only as many items as they can process by using a feedback channel known as backpressure

Flow Control

Before RxJava began implementing backpressure (see the section “Backpressure”), dealing with producers (Observables) outperforming consumers (Observers) was a difficult task. There are quite a few operators that were invented to deal with producers pushing too many events, and most of them are quite interesting on their own. Some are useful for batching events; others are dropping some events. This section walks you through these operators, including some examples.

Taking Periodic Samples and Throttling

There are cases for which you definitely want to receive and process every single event pushed from the upstream Observable. But, there are some scenarios for which periodic sampling is enough. The most obvious case is receiving measurements from some device; for example, temperature. The frequency at which the device produces new measurements is often irrelevant for us, especially when the measurements appear often but are very similar to one another. The sample() operator looks at the upstream Observable periodically (for example, every second) and emits the last encountered event. If there were no event at all in the last one-second period, no sample is forwarded downstream and the next sample will be taken after one second, as illustrated in this sample:

1
2
3
4
5
6
7
8
long startTime = System.currentTimeMillis();
Observable
.interval(7, TimeUnit.MILLISECONDS)
.timestamp()
.sample(1, TimeUnit.SECONDS)
.map(ts -> ts.getTimestampMillis() - startTime + "ms: " + ts.getValue())
.take(5)
.subscribe(System.out::println);

The preceding code snippet will print something similar to the following:

1
2
3
4
5
1088ms: 141
2089ms: 284
3090ms: 427
4084ms: 569
5085ms: 712

The first column shows relative time from subscription to sample emission. You can clearly see that the first sample appears a little bit over one second (as requested by the sample() operator) and subsequent samples are roughly one second after one another. More important, notice what the values are. The interval() operator emits natural numbers starting from zero every seven milliseconds. Thus, by the time the first sample is taken, we can expect about 142 (1,000/7) events to appear, where 142nd value is 141 (0-based).

Let’s explore a sample that’s a little bit more complex. Imagine that you have a list of names that appear with some absolute delays, like so:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable<String> names = Observable
.just("Mary", "Patricia", "Linda",
"Barbara",
"Elizabeth", "Jennifer", "Maria", "Susan",
"Margaret", "Dorothy");
Observable<Long> absoluteDelayMillis = Observable
.just(0.1, 0.6, 0.9,
1.1,
3.3, 3.4, 3.5, 3.6,
4.4, 4.8)
.map(d -> (long)(d * 1_000));
Observable<String> delayedNames = names
.zipWith(absoluteDelayMillis,
(n, d) -> Observable
.just(n)
.delay(d, MILLISECONDS))
.flatMap(o -> o);
delayedNames
.sample(1, SECONDS)
.subscribe(System.out::println);

First, we construct a sequence of names followed by a sequences of absolute delays (in seconds, later mapped to milliseconds). Using the zipWith() operator, we delay() the occurrence of certain names. The sample() operator will periodically (every second) pick the last seen name from the stream within the last period. So, after the first second, we println Linda, followed by Barbara a second later. Now between 2,000 and 3,000 milliseconds since subscription, no name appeared, so sample() does not emit anything. Two seconds after Barbara was emitted, we see Susan. sample() will forward completion (and errors, as well) discarding the last period. If we want to see Dorothy appearing as well, we can artificially postpone the completion notification, as is done here:

1
2
3
4
5
6
7
8
9
10
static <T> Observable<T> delayedCompletion() {
return Observable.<T>empty().delay(1, SECONDS);
}
//...
delayedNames
.concatWith(delayedCompletion())
.sample(1, SECONDS)
.subscribe(System.out::println);

sample() has a more advanced variant taking Observable as an argument rather than a fixed period. This second Observable (known as sampler) basically dictates when to take a sample from the upstream source: every time sampler emits any value, a new sample is taken (if any new value appeared since the last sample). You can use this overloaded version of sample() to dynamically change the sampling rate or take samples only at very specific points in time. For example, taking a snapshot of some value when a new frame is redrawn or when a key is pressed. A trivial example can simply emulate the fixed period by using the interval() operator:

1
2
3
//equivalent:
obs.sample(1, SECONDS);
obs.sample(Observable.interval(1, SECONDS));

As you can see, there are some subtleties regarding sample()’s behavior. Rather than relying on our understanding of documentation or manual verification, it is great to have automated tests. Testing time-sensitive operators like sample() is covered in the section “Virtual Time”.

sample() has an alias in RxJava called throttleLast(). Symmetrically, there is also the throttleFirst() operator that emits the very first event that appeared in each period. So, applying throttleFirst() instead of sample() in our name stream yields rather expected results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<String> names = Observable
.just("Mary", "Patricia", "Linda",
"Barbara",
"Elizabeth", "Jennifer", "Maria", "Susan",
"Margaret", "Dorothy");
Observable<Long> absoluteDelayMillis = Observable
.just(0.1, 0.6, 0.9,
1.1,
3.3, 3.4, 3.5, 3.6,
4.4, 4.8)
.map(d -> (long)(d * 1_000));
//...
delayedNames
.throttleFirst(1, SECONDS)
.subscribe(System.out::println);

The output looks like this:

1
2
3
4
Mary
Barbara
Elizabeth
Margaret

Just like sample() (aka throttleLast()), throttleFirst() does not emit any event when no new name appeared between Barbara and Elizabeth.

Buffering Events to a List

Buffering and moving windows are among the most exciting built-in operators offered by RxJava. They both traverse input stream through a window that captures several consecutive elements and moves forward. On one hand, they allow batching values from an upstream source to handle them more effectively. In practice, they are flexible and versatile tools that allow various aggregations of data on the fly.

The buffer() operator aggregates batches of events in real time into a List. However, unlike the toList() operator, buffer() emits several lists grouping some number of subsequent events as opposed to just one containing all events (like toList()). The simplest form of buffer() groups values from upstream Observable into a lists of equal size:

1
2
3
4
5
6
7
Observable
.range(1, 7) //1, 2, 3, ... 7
.buffer(3)
.subscribe((List<Integer> list) -> {
System.out.println(list);
}
);

Of course, subscribe(System.out::println) would work as well; we left the type information for educational purposes. The output shows three events emitted from the buffer(3) operator:

1
2
3
[1, 2, 3]
[4, 5, 6]
[7]

buffer() keeps receiving upstream events and buffers them (hence the name) internally until the buffer reaches a size of 3. When that happens, the entire buffer (List<Integer>) is pushed downstream. When the completion notification appears and internal buffer was not empty (but not yet of size 3), it is pushed downstream anyway. That is the reason we see a one-element list in the end.

By using the buffer(int) operator you can replace several fine-grained events with less but bigger batches. For example, if you want to reduce database load, you might want to replace storing each event individually by storing them in batches:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
interface Repository {
void store(Record record);
void storeAll(List<Record> records);
}
//...
Observable<Record> events = //...
events
.subscribe(repository::store);
//vs.
events
.buffer(10)
.subscribe(repository::storeAll);

The latter subscription calls storeAll on Repository, storing batches of 10 elements at once. This can potentially improve throughput in your application.

buffer() has many overloaded variants. A slightly more complex version allows you to configure how many oldest values from internal buffer to drop when buffer() pushes the list downstream. That sounds complex, but put in more basic terms, it makes it possible for you to look at your event stream through the moving window of a certain size:

1
2
3
4
Observable
.range(1, 7)
.buffer(3, 1)
.subscribe(System.out::println);

This yields several overlapping lists:

1
2
3
4
5
6
7
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7]
[7]

You can use buffer(N, 1) variant if you want to compute a moving average of some time-series data. The code example that follows generates 1,000 random values from normal distribution. Later, we take a sliding window of 100 elements (advancing one element at a time) and compute the average of such a window. Run this program yourself and notice how the moving average is much smoother than the random unordered values.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Random;
import java.util.stream.Collectors;
//...
Random random = new Random();
Observable
.defer(() -> just(random.nextGaussian()))
.repeat(1000)
.buffer(100, 1)
.map(this::averageOfList)
.subscribe(System.out::println);
//...
private double averageOfList(List<Double> list) {
return list
.stream()
.collect(Collectors.averagingDouble(x -> x));
}

You can probably imagine that calling buffer(N) is in fact equivalent to buffer(N, N). The simplest form of buffer() drops the entire internal buffer when it becomes full. Interestingly, the second parameter of buffer(int, int) (that specifies how many elements to skip when the buffer is pushed downstream) can be bigger than the first argument, effectively skipping some elements!

1
2
3
4
Observable<List<Integer>> odd = Observable
.range(1, 7)
.buffer(1, 2);
odd.subscribe(System.out::println);

This setup forwards the first element but then skips two: the first and the second one. Then the cycle repeats: buffer() forwards the third element but then skips the third and fourth. Effectively, the output is: [1] [3] [5] [7]. Notice that each element in the odd Observable is actually a one-element list. You can use flatMap() or flatMapIterable() to get back a simple Observable<Integer>:

1
2
3
4
Observable<Integer> odd = Observable
.range(1, 7)
.buffer(1, 2)
.flatMapIterable(list -> list);

flatMapIterable() expects a function that transforms each value in the stream (one-element List<Integer>) into a List. Identity transformation (list -> list) is enough here.

BUFFERING BY TIME PERIODS

buffer() is actually a broad family of operators. Rather than batching upstream events based on size (so that each batch has the same size), another variant of buffer() batches events by time period. While throttleFirst() and throttleLast() were taking first and last events within a given period of time accordingly, one of the overloaded versions of buffer batches all events in each time period. Coming back to our names example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable<String> names = just(
"Mary", "Patricia", "Linda", "Barbara", "Elizabeth",
"Jennifer", "Maria", "Susan", "Margaret", "Dorothy");
Observable<Long> absoluteDelays = just(
0.1, 0.6, 0.9, 1.1, 3.3,
3.4, 3.5, 3.6, 4.4, 4.8
).map(d -> (long) (d * 1_000));
Observable<String> delayedNames = Observable.zip(names,
absoluteDelays,
(n, d) -> just(n).delay(d, MILLISECONDS)
).flatMap(o -> o);
delayedNames
.buffer(1, SECONDS)
.subscribe(System.out::println);

An overloaded version of buffer() that accepts time period (one second in the preceding example) aggregates all upstream events within that period. Therefore, buffer() collects all events that happened during first time period, second time period, and so on:

1
2
3
4
5
[Mary, Patricia, Linda]
[Barbara]
[]
[Elizabeth, Jennifer, Maria, Susan]
[Margaret, Dorothy]

The third List<String> is empty because no events appeared in that time frame. One of the use cases for buffer() is counting the number of events per each time period; for example, number of key events per second:

1
2
3
4
5
Observable<KeyEvent> keyEvents = //...
Observable<Integer> eventPerSecond = keyEvents
.buffer(1, SECONDS)
.map(List::size);

Luckily, because no events within a one-second period yields an empty list, we do not have gaps in our measurements. However, this is not the most efficient way, as we will soon discover with the window() operator.

The most comprehensive overload of buffer() allows you to take full control over when this operator begins buffering events and when the buffer should be flushed downstream. In other words, you choose in which periods of time upstream events should be grouped. Imagine that you are monitoring some industrial device that pushes telemetric data very often. The amount of data is overwhelming, so to save some computational capacity, you decided to look only at certain samples. The algorithm follows:

  • During business hours (9:00–17:00), we take 100-millisecond long snapshots every second (processing approximately 10% of data)
  • Outside business hours we look only at 200-millisecond long snapshots taken every 5 seconds (4%)

In other words, once every second (or 5 seconds) we buffer all events for 100 milliseconds (or 200 accordingly) and emit lists of all of the events within that period. This will become clear when you see the entire example.

First, we need an Observable that emits any value whenever we want to begin buffering (grouping) upstream events. This Observable can literally push any type of value, but this is irrelevant because only timing matters. The fact that we are returning Duration from the java.time package is a coincidence, RxJava does not use this value in any way:

1
2
3
4
5
6
7
8
9
10
11
Observable<Duration> insideBusinessHours = Observable
.interval(1, SECONDS)
.filter(x -> isBusinessHour())
.map(x -> Duration.ofMillis(100));
Observable<Duration> outsideBusinessHours = Observable
.interval(5, SECONDS)
.filter(x -> !isBusinessHour())
.map(x -> Duration.ofMillis(200));
Observable<Duration> openings = Observable.merge(
insideBusinessHours, outsideBusinessHours);

First using the interval() operator we generate timer ticks every second but exclude those that are not within business hours. This way, we get a steady clock ticking every second between 9:00 and 17:00. Recall that interval() returns growing natural Long numbers; however, we do not need them, so for convenience we replace them with fixed duration of 100 milliseconds. Symmetrical code creates a steady stream of events every 5 seconds between 17:00 and 9:00. If you are curious how isBusinessHour() is implemented, it uses the java.time package:

1
2
3
4
5
6
7
8
9
10
private static final LocalTime BUSINESS_START = LocalTime.of(9, 0);
private static final LocalTime BUSINESS_END = LocalTime.of(17, 0);
private boolean isBusinessHour() {
ZoneId zone = ZoneId.of("Europe/Warsaw");
ZonedDateTime zdt = ZonedDateTime.now(zone);
LocalTime localTime = zdt.toLocalTime();
return !localTime.isBefore(BUSINESS_START)
&& !localTime.isAfter(BUSINESS_END);
}

The openings stream merges together insideBusinessHours and outsideBusinessHours streams. It is basically a trigger that instructs the buffer() operator when to begin collecting samples from upstream rather than discarding them. Whatever the value is emitted from the openings stream is entirely irrelevant. But we must also specify when to stop aggregating (buffering) events and push them downstream as one batch in a List. The most obvious solution is to treat each event emitted from openings stream as a signal to stop the current batch, emit it downstream, and start another batch:

1
2
3
4
Observable<TeleData> upstream = //...
Observable<List<TeleData>> samples = upstream
.buffer(openings);

Notice how we pass carefully crafted openings stream to the buffer() operator. The preceding code example slices the upstream source of TeleData values. The ticking clock of openings stream batches events from upstream. Within business hours, a new batch is created every second, outside business hours, batches group values in five-second periods. Importantly in this version, all events from upstream are preserved because they either land in one batch or the other. However, an overloaded version of the buffer() operator also allows marking the end of a batch:

1
2
3
4
5
Observable<List<TeleData>> samples = upstream
.buffer(
openings,
duration -> empty()
.delay(duration.toMillis(), MILLISECONDS));

First recall that openings is an Observable<Duration>, but the actual value of events from openings is not important. RxJava merely uses this event to start buffering TeleData instances. But this time we have full control when buffering and emission of this buffer should occur. The second parameter is an Observable that must complete whenever we want to stop sampling. Completion of this second stream marks the end of a given batch.

Look carefully: the openings stream emits an event every time we would like to start a new batch. For each event emitted from openings we return a new Observable that should complete some time in the future. So, for example, when the openings stream emits an event of Duation.ofMillis(100) value, we transform it to an Observable that completes when a given batch should end, after 100 milliseconds.

Notice that in this case some events might be dropped or duplicated in consecutive batches. If second, the Observable — which is responsible for marking the end of a given batch, appears before an opening event of the next batch, within this time gap events are discarded by buffer(). This is our case: we begin buffering events every second (or every other second outside business hours), but the buffer closes and is being forwarded after 100 milliseconds (or 200, accordingly). The majority of events fall between buffering periods and therefore are discarded.

The buffer() operator is extremely flexible and quite complex. Make sure that you experiment a little bit with it and understand the preceding example. It is used to smartly batch events from the upstream source to achieve grouping, sampling, or moving window functionality. But because buffer() requires creating an intermediate List before the current buffer is closed and passed downstream, it might unnecessarily put pressure on garbage collection and memory usage. Therefore, the window() operator was introduced.

Moving window

When working with buffer() we build List instances over and over. Why do we build these intermediate Lists rather then somehow consume events on the fly? This is where the window() operator becomes useful. You should prefer window() over buffer() if possible because the latter is less predictable in terms of memory usage. The window() operator is very similar to buffer(): it has similar overloaded versions including the one that does the following:

  • Receive int, grouping events from source into fixed-size lists
  • Receive time unit, grouping events within fixed-time periods
  • Receive custom Observables marking the beginning and end of each batch

What is the difference, then? Remember the example that was counting how many events occurred per second in given source? Let’s take another look at it:

1
2
3
4
5
Observable<KeyEvent> keyEvents = //...
Observable<Integer> eventPerSecond = keyEvents
.buffer(1, SECONDS)
.map(List::size);

We batch all events from Observable<KeyEvent> that occurred in each second into Observable<List<KeyEvent>>. In the next step, we map List into its size. This is quite wasteful, especially if the number of events in each second is significant:

1
2
3
Observable<Observable<KeyEvent>> windows = keyEvents.window(1, SECONDS);
Observable<Integer> eventPerSecond = windows
.flatMap(eventsInSecond -> eventsInSecond.count());

window(), as opposed to buffer(), returns an Observable<Observable<KeyEvent>>. Think about it for a moment. Rather than receiving fixed lists with each one containing one batch (or buffer), we receive a stream of streams. Every time a new batch begins (every one second in the preceding example), a new Observable<KeyEvent> value appears in the outer stream. We can further transform all of these inner streams, but to avoid double-wrapping we use flatMap().

flatMap() receives each buffer (an Observable<KeyEvent>) as an argument and is suppose to return another Observable. The count() operator transforms an Observable<T> into an Observable<Integer> that emits just one item representing the number of events in the original Observable. Therefore, for each one-second batch we produce, the number of events occurred within that second. But there is no internal buffering; the count() operator counts events on the fly as they pass through.

Skipping Stale Events by Using debounce()

buffer() and window() group several events together so that you can process them in batches. sample() picks one fairly arbitrary event once in a while. These operators do not take into account how much time elapsed between events. But in many cases, the event can be discarded if it is shortly followed by another event. For example, imagine a stream of stock prices flowing from a trading platform:

1
2
Observable<BigDecimal> prices = tradingPlatform.pricesOf("NFLX");
Observable<BigDecimal> debounced = prices.debounce(100, MILLISECONDS);

debounce() (alias: throttleWithTimeout()) discards all of the events that are shortly followed by another event. In other words, if a given event is not followed by another event within a time window, that event is emitted.

In the preceding example, the prices stream pushes prices of "NFLX" stock every time they change. Prices sometimes change very frequently, dozens of times per second. For each price change we would like to run some computation that takes a significant amount of time to complete. However, if a new price arrives, the result of this computation is irrelevant; it must begin from scratch with this new price. Therefore, we would like to discard events if they are followed (suppressed by) a new event shortly after.

debounce() waits a little bit (100 milliseconds in the preceding example) just in case second event appears later on. This process repeats itself so that if a second event appears in less than 100 milliseconds from the first one, RxJava will postpone its emission, hoping for the third one to appear. This time, again, you have an option to flexibly control for how long to wait on a per-event basis.

For example, you might want to ignore stock price changes if they are followed by an update in less than 100 milliseconds. However, if the price goes above $150, we would like to forward such an update downstream much faster without hesitation. Maybe because some types of events need to be handled straight away; for example, because they are great market opportunities. you can implement this easily by using an overloaded version of debounce():

1
2
3
4
5
6
7
prices
.debounce(x -> {
boolean goodPrice = x.compareTo(BigDecimal.valueOf(150)) > 0;
return Observable
.empty()
.delay(goodPrice? 10 : 100, MILLISECONDS);
});

For each new update of price x, we apply sophisticated logic (> $150) to figure out if the price is good. Then, for each such update we return a unique Observable, which is empty. It does not need to emit any items; it is important when it completes. For good prices, it emits a completion notification after 10 milliseconds. For other prices, this Observable completes after 100 milliseconds. The debounce() operator for each event it receives subscribes to this Observable waiting for its completion. If it completes first, the event is passed downstream. Otherwise, if more recent upstream event appeared in the meantime, the cycle repeats.

In our example, when a price x of $140 appears, the debounce() operator creates a new Observable with completion delayed by 100 milliseconds via the expression we provided. If no events appear before the completion of this event, the $140 event will be forwarded downstream. However, imagine another price update x of $151 came along. This time when the debounce() operator asks us to provide an Observable (called debounceSelector in the API) we return a stream that completes much faster, after 10 milliseconds. So in case of good prices (greater than $150), we are willing to wait only 10 milliseconds for a subsequent update. If you still struggle to understand how debounce() works, here is a stock price simulator you can try:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<BigDecimal> pricesOf(String ticker) {
return Observable
.interval(50, MILLISECONDS)
.flatMap(this::randomDelay)
.map(this::randomStockPrice)
.map(BigDecimal::valueOf);
}
Observable<Long> randomDelay(long x) {
return Observable
.just(x)
.delay((long) (Math.random() * 100), MILLISECONDS);
}
double randomStockPrice(long x) {
return 100 + Math.random() * 10 +
(Math.sin(x / 100.0)) * 60.0;
}

The preceding code nicely composes several streams. First, we generate a sequence of long values emitted in fixed 50-millisecond intervals. Then, we delay each event independently by some random value between 0 and 100 milliseconds. Last but not least, we transform infinitely growing long numbers into a sine wave (using Math.sin()) with random jitter. This simulates stock price fluctuation over time.

If you run this stream against the debounce() operator, you will notice that as long as prices are low, events are generally infrequent because we are willing to wait as much as 100 milliseconds for subsequent event, which often occurs. But when the price goes above $150, the debounce() tolerance goes down to 10 milliseconds, so effectively every good price update is forwarded downstream.

AVOID STARVATION IN DEBOUNCE()

It is quite easy to imagine a situation in which the debounce() operator prevents emission of all events because they simply appear too often and there is never a moment of silence:

1
2
3
Observable
.interval(99, MILLISECONDS)
.debounce(100, MILLISECONDS)

Such a source will never emit any event because debounce() waits as much as 100 milliseconds to wnsure that there is no more recent event. Unfortunately, just 1 millisecond before this timeout, a new event appears, starting debounce’s timer all over. This leads to an Observable that produces events so often that we may never get to see any of them (!) You can call this a feature, but in practice you might want to see some event from time to time, even in case of flood. To prevent such a situation we must get a little bit creative.

First, we must discover a situation in which no new event appeared for a long time. We already played with timeout() operator in the section “Timing Out When Events Do Not Occur”, so we know that part is easy:

1
2
3
4
Observable
.interval(99, MILLISECONDS)
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS);

Now, we at least get an exception signaling an idle upstream source. Bizarrely it is the opposite — the upstream interval() operator produces events too often and because of that, debounce() never passes them downstream — but we digress. If events appear too often, we hold them back waiting for a moment of silence. But if this silence is too long (more than one second), we fail and throw a TimeoutException. Rather than failing permanently, we would actually like to see an arbitrary value from upstream Observable and continue. The first part of the task is simple:

1
2
3
4
5
6
7
ConnectableObservable<Long> upstream = Observable
.interval(99, MILLISECONDS)
.publish();
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream.take(1));
upstream.connect();

The timeout() operator has an overloaded version that accepts a fallback Observable upon timeout. Unfortunately, there is a subtle bug here. In case of timeout, we naively take the first encountered item from upstream and then complete. What we really want is to continue emitting events from upstream, still with debounce() support.

Another approach is seemingly better:

1
2
3
4
5
6
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream
.take(1)
.concatWith(
upstream.debounce(100, MILLISECONDS)));

It looks somewhat OK at first sight. The original source, after applying debounce() has a timeout. When timeout occurs, we emit the very first item we encountered and continue with the same source, also by using the debounce() operator. However, in the case of a first timeout, we switch to the fallback Observable that no longer has a timeout() operator applied. A quick, dirty, and short-sighted fix:

1
2
3
4
5
6
7
8
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream
.take(1)
.concatWith(
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream)))

Yet again, we forgot to put a fallback Observable in the inner timeout() operator. Enough is enough, you should already see a recurring pattern here. Rather than infinitely repeating the same form of upstreamdebouncetimeout()upstream → …we can use recursion!

1
2
3
4
5
6
7
8
Observable<Long> timedDebounce(Observable<Long> upstream) {
Observable<Long> onTimeout = upstream
.take(1)
.concatWith(defer(() -> timedDebounce(upstream)));
return upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, onTimeout);
}

The definition of onTimeout fallback Observable in timedDebounce is tricky. We declare that it first takes one sample event from upstream (which is the original source) followed by a recursively invoked timedDebounce() method. We must use the defer() operator to avoid infinite recursion. The rest of the timedDebounce() basically takes the original upstream source, applies the debounce() operator, and adds fallback onTimeout. This fallback does the exact same thing: applies debounce(), adds a timeout(), and fallback — recursively.

Do not become depressed if you find it difficult to grasp at first. This is a rather complex example showing the power of stream composition together with laziness and recursion. You hardly ever need that level of complexity, but after you grasp how it works, it is quite satisfying. Play around with this code and observe how tiny changes drastically alter the way streams interact with one another.

Backpressure

Backpressure is quite essential to build robust and responsive applications. In essence, it is a feedback channel from the consumer to producer. The consumer has a certain level of control over how much data it can process at any time. This way consumers or messaging middleware are not becoming saturated and unresponsive under high load. Instead, they request fewer messages, letting the producer decide how to slow down.

In every system that exchanges data via message passing (or events for that matter), a problem of consumers not keeping up with producers can arise. Depending on the underlying implementation, it can manifest in different ways. If the communication channel somehow synchronizes producers and consumers (for example, by using ArrayBlockingQueue), the producer is throttled (blocked) when the consumer is not keeping up with the load.

This leads to coupling between the producer and consumer which should otherwise be entirely independent. Message passing typically means asynchronous processing, an assumption that fails when the producer suddenly must wait for the consumer. Even worse, the producer might be a consumer of a different producer higher in the hierarchy, cascading increased latency up.

Conversely, if the medium in between these two parties is unbounded, well… it is still bound by factors over which we have less control. An infinite queue like LinkedBlockingQueue allows the producer to greatly outperform consumers without blocking. That is, until LinkedBlockingQueue does not consume all memory and crash entire application.

If the medium is persistent—for example a JMS message broker—the same problem can technically manifest as well with disk space, but this is less probable. Far more common is a situation in which the messaging middleware finds it difficult to manage thousands if not millions of unconsumed messages. Some specialized brokers such as Kafka can technically store hundreds of millions of messages until lagging consumers get a hold of them. But this leads to an enormous increase in latencies, measured as a time between message production and consumption.

Although message-driven systems are generally considered more robust and scalable, the problem of too eager producers remains unsolved. However, there are some efforts to solve this integration issue. Sampling and throttling (using sample() and others) and batching (using window() and buffer()) are manual ways of reducing producer load in RxJava.

When Observable generates more events that can be consumed, we can apply sampling or batching to increase subscription throughput. Yet, more robust and systematic approach was needed, hence the Reactive Streams initiative was born out of necessity. This small set of interfaces and semantics aims to formalize the problem and provide a systematic algorithm for producer–consumer coordination, known as backpressure.

Backpressure is a simple protocol that allows the consumer to request how much data it can consume at a time, effectively providing a feedback channel to a producer. Producers receive requests from consumers, avoiding message overflow. Of course, this algorithm can work only with producers that are capable of throttling themselves; for example, when they are backed by static collection or source that can be pulled from something like Iterator. When the producer has no control over the frequency of data it produces (the source is external or hot), backpressure can not help much.

Backpressure in RxJava

Even though Reactive Streams solve a very general problem in technology-agnostic way, we will focus on RxJava and how it approaches the problem of backpressure. Throughout this chapter we will use an example of continually washing dishes in a small restaurant. Dishes are modeled as large objects with an identifier:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Dish {
private final byte[] oneKb = new byte[1_024];
private final int id;
Dish(int id) {
this.id = id;
System.out.println("Created: " + id);
}
public String toString() {
return String.valueOf(id);
}
}

The oneKb buffer simulates some extra memory utilization. Dishes are passed to the kitchen by waiters and are modeled as an Observable:

1
2
3
Observable<Dish> dishes = Observable
.range(1, 1_000_000_000)
.map(Dish::new);

The range() operator produces new values as fast as it possibly can. So what happens if washing dishes takes a little bit of time and is clearly slower than the pace of production?

1
2
3
4
5
6
7
Observable
.range(1, 1_000_000_000)
.map(Dish::new)
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
});

Surprisingly nothing bad. If you study the output you will notice that range() is perfectly aligned with subscription:

1
2
3
4
5
6
7
8
9
10
Created: 1
Washing: 1
Created: 2
Washing: 2
Created: 3
Washing: 3
...
Created: 110
Washing: 110
...

This should not come as a surprise to you. The range() operator is not asynchronous by default, so every item it produces is passed to a Subscriber directly within the context of the same thread. If the Subscriber is slow, it effectively prevents Observable from producing more elements. range() cannot call onNext() of the Subscriber until the previous one finished. This is possible because both producer and consumer work in the same thread and are transparently coupled. In some sense, there is an implicit queue between them with a maximum capacity of one.

A rendezvous algorithm that we did not anticipate. Imagine a waiter in a restaurant who cannot leave new dishes for cleaning as long as the ones currently being washed are not done. But when a waiter stands still waiting for dish washing to be done, customers are not served. And when they are not served, new customers cannot enter the restaurant. This is how one blocking component can bring the entire system to a stall. However, in real life there is typically a thread boundary between producer and consumer: Observable produces events in one thread, whereas Subscriber consumes in another:

1
2
3
4
5
6
dishes
.observeOn(Schedulers.io())
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
});

Stop for a moment and think about what could happen without actually compiling and running the code. One might think that a disaster should occur because dishes produces events very fast from the range() operator, whereas Subscriber is quite slow, consuming only 20 dishes per second.

The observeOn() operator keeps consuming events in quick succession but the Subscriber is consuming them way to slow. Therefore you might conclude that OutOfMemoryError is unavoidable with unprocessed events piling up somewhere. Luckily backpressure saves the day in this case and RxJava protects us to some degree. The output of the program is somewhat unexpected:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Created: 1
Created: 2
Created: 3
...
Created: 128
Washing: 1
Washing: 2
...
Washing: 128
Created: 129
...
Created: 223
Created: 224
Washing: 129
Washing: 130
...

First, a batch of 128 dishes is being produced by range() pretty much instantaneously. Later, there is a slow process of washing dishes, one by one. Somehow the range() operator becomes idle. When the last dish out of these 128 is washed, another batch of 96 dishes is produced by range(), followed by a slow process of washing.

Apparently, there must be some clever mechanism that prevents range() from producing too many events, controlled by subscriber. If you do not see where such mechanism is deployed, let’s try to implement range() ourselves:

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable<Integer> myRange(int from, int count) {
return Observable.create(subscriber -> {
int i = from;
while (i < from + count) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
} else {
return;
}
}
subscriber.onCompleted();
});
}

Here, we’re using myRange() in the same example together with observeOn():

1
2
3
4
5
6
7
8
9
myRange(1, 1_000_000_000)
.map(Dish::new)
.observeOn(Schedulers.io())
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
},
Throwable::printStackTrace
);

This ends with catastrophe, and we never even get to wash any dish:

1
2
3
4
5
6
7
8
9
10
Created: 1
Created: 2
Created: 3
...
Created: 7177
Created: 7178
rx.exceptions.MissingBackpressureException
at rx.internal.operators...
at rx.internal.operators...

MissingBackpressureException will be explained later on. For the time being, I’m guessing that this convinces you that there is some background mechanism that our custom implementation of range() is lacking.

Built-in Backpressure

For the past several chapters we watched how events were flowing downstream from the source Observable, through a sequence of operators, down to a Subscriber. There was never any feedback channel past a subscription request. The moment we invoked subscribe() (which in some sense propagates up) all events and notifications are traveling down without any apparent feedback loop. This lack of feedback can lead to producers (uppermost Observable) emitting a number of events overwhelming the subscriber. As a consequence, your application can crash with OutOfMemoryError or at best become very latent.

Backpressure is a mechanism that allows terminal subscribers as well as all intermediate operators to request only a certain number of events from the producer. By default, an upstream cold Observable produces events as fast it can. But in the presence of such requests coming from downstream, it should in a way “slow down” and produce exactly the number requested. This is the reason behind the magic number of 128 seen with observeOn(). But, first let’s see how the final subscriber can control backpressure.

When subscribing, we have a possibility to implement onNext(), onCompleted(), and onError(). Turns out there is another callback method to implement: onStart().

1
2
3
4
5
6
7
8
9
10
11
Observable
.range(1, 10)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(3);
}
//onNext, onCompleted, onError follows...
});

onStart() is invoked by RxJava exactly when you think it should — before any event or notification is propagated to Subscriber. You can technically use a constructor of your Subscriber, but for anonymous inner classes in Java, constructors look really eerie:

1
2
3
4
5
6
7
8
.subscribe(new Subscriber<Integer>() {
{{
request(3);
}}
//onNext, onCompleted, onError follows...
});

But we digress. The request(3) invocation inside a Subscriber instructs the upstream source how many items we are willing to receive at first. Skipping this invocation entirely (or calling request(Long.MAX_VALUE)) is equivalent to requesting as many events as possible.

This is the reason why we must invoke request() very early; otherwise, the stream begins to emit events and we cannot decrease our demand later on. But when we request only three events, the range() operator will obediently stop emitting events temporarily after pushing 1, 2, and 3. Our onNext() callback method will be invoked three times and no more, despite range() operator not being completed yet. However, we, as a Subscriber, have full control over how much data we want to receive. For example, we might want to request items individually:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable
.range(1, 10)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onNext(Integer integer) {
request(1);
log.info("Next {}", integer);
}
//onCompleted, onError...
});

This example is a bit silly because it behaves just like an ordinary Subscriber without any backpressure whatsoever. But it illustrates how you can use backpressure. You can probably imagine a Subscriber that prebuffers some number of events and then requests chunks when it finds it convenient. Subscriber might decide that it wants to wait a little bit before receiving more events, despite being idle, for example to reduce stress on some downstream dependency. In our restaurant example, the waiter is an Observable<Dish> that keeps pushing new dirty dishes whereas request(N) is a readiness of the kitchen staff to wash a certain number of dishes. A good waiter should not deliver new dishes without a request from the kitchen’s staff.

That being said, calling request(N) directly in client code is rare. More often, the various operators that we put between source and final Subscriber take advantage of backpressure to control how much data flows through our pipeline. For example observeOn() must subscribe to the upstream Observable and schedule each event it receives on a particular Scheduler, such as io(). But what if upstream produces events at such a pace that the underlying Scheduler and Subscriber can no longer keep up? The Subscriber that is being created by the observeOn() operator is backpressure-enabled, it requests only 128 values to begin with.

The upstream Observable, which understands backpressure, emits only a given number of events and remains idle — this is what range() does, for example. When observeOn() finds that this batch of events was successfully processed by downstram Subscriber, it requests more. This way, despite crossing a thread boundary and the asynchronous nature of both producer and consumer side, the consumer is never flooded with events.

observeOn() is not the only operator that is backpressure friendly. As a matter of fact, dozens of other operators take advantage of it. For example, zip() buffers only a fixed number of events from each underlying Observable. Thanks to this zip() is not affected in case of only one of zipped streams being very active. The same logic applies to most of the operators we use.

Producers and Missing Backpressure

We already came across a MissingBackpressureException in our custom implementation of range(). What does it actually mean and how do you interpret this exception? Imagine a Subscriber (yours but more often the one create by some operator) that knows exactly how many items it wants to receive; for example, buffer(N) or take(N).

Another example of such an operator is observeOn(). It must be very strict in that regard, if upstream Observable pushes more items for some reason, the internal buffer inside observeOn() overflows and it is signaled with MissingBackpressureException. But why does an upstream Observable push more items than requested? Well, because it simply ignores the request() invocations. Let’s revisit our simple range() reimplementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable<Integer> myRange(int from, int count) {
return Observable.create(subscriber -> {
int i = from;
while (i < from + count) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
} else {
return;
}
}
subscriber.onCompleted();
});
}

The only way to stop it is by unsubscribing, but we do not want to unsubscribe, just slow it down a little bit. Downstream operators know precisely how many events they want to receive, but our source ignores these requests. The low-level mechanism for honoring the requested number of events is implemented via the rx.Producer. This interface is plugged in within create(). To recap, OnSubscribeRange is a callback that is executed every time someone subscribes to this Observable. Normally, you would see calling onNext() directly from within this interface, but not when backpressure is taken into account:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable<Integer> myRangeWithBackpressure(int from, int count) {
return Observable.create(new OnSubscribeRange(from, count));
}
class OnSubscribeRange implements Observable.OnSubscribe<Integer> {
//constructor...
@Override
public void call(final Subscriber<? super Integer> child) {
child.setProducer(new RangeProducer(child, start, end));
}
}
class RangeProducer implements Producer {
@Override
public void request(long n) {
//calling onNext() on child subscriber here
}
}

This is the skeleton of code you will find in RxJava’s implementation of range(). Implementing Producer is quite a challenging task: it must be stateful, thread-safe, and extremely fast. Thus, we do not normally implement producers ourselves but it is useful to understand how they work. Backpressure internally turns Rx principles upside down. Observable produced by range() (and many other built-in operators) no longer pushes data eagerly to Subscribers. Instead, it wakes up and reacts on data requests (request(N) invocations within Subscriber) and only then produces events. Also, it makes sure not to produce more than was requested.

Look how we set a Producer on child Subscriber — this Producer will later be invoked indirectly within Subscriber whenever it calls request(). This is how we set up a feedback channel from Subscriber to the source Observable. An Observable instructs its Subscriber how it can request certain amount of data. Effectively Observable switches from push to pull–push model, where clients can optionally request only limited number of events.

So, what to do if some foreign Observable does not set up such a channel? Well, when RxJava discovers that it’s dealing with a source that does not support backpressure, it can fail with MissingBackpressureException at any time. However, there are operators from the onBackpressure*() family that can simulate backpressure to some extent.

The simplest onBackpressureBuffer() operator unconditionally buffers all upstream events and serves only the requested amount of data to downstream subscribers:

1
2
3
4
5
6
7
8
myRange(1, 1_000_000_000)
.map(Dish::new)
.onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribe(x -> {
System.out.println("Washing: " + x);
sleepMillis(50);
});

As always, reading from bottom to top: first subscribe() propagates up to the observeOn() operator. observeOn() must subscribe, as well, but it cannot simply begin consuming arbitrary number of events. Thus, it requests only a fixed number at the beginning (128) to avoid overflow of the io() Scheduler’s queue. The onBackpressureBuffer() operator acts as a guard against sources ignoring backpressure. When it receives request(128) from the downstream Subscriber, it passes the request up and does nothing if only 128 flow through it. But, in the event that the Observable that ignored that request and simply pushed data irrespective to backpressure, onBackpressureBuffer() keeps an unbounded buffer internally.

When another request comes from a downstream Subscriber, onBackpressureBuffer() first drains its internal buffer, and only when it is almost empty does it ask upstream for more. This clever mechanism allows observeOn() to work as if myRange() was backpressure-enabled, whereas in reality it is onBackpressureBuffer() that does the throttling. Unfortunately, infinite internal buffer is not something that you can treat lightheartedly:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Created: 1
Created: 2
Created: 3
Created: 4
Created: 8
Created: 9
Washing: 1
Created: 10
Created: 11
...
Created: 26976
Created: 26977
Washing: 15
Exception in thread "main" java.lang.OutOfMemoryError: ...
Washing: 16
at java.util.concurrent.ConcurrentLinkedQueue.offer...
at rx.internal.operators.OperatorOnBackpressureBuffer...
...

Of course, your mileage may vary, and with smaller events and sufficient amount of memory, onBackpressureBuffer() can technically work. But in reality, you should never rely on unbounded resources. Neither memory nor your solid state drive are inifnite. Luckily there is an overloaded version of onBackpressureBuffer(N) that accepts the maximum buffer size:

1
.onBackpressureBuffer(1000, () -> log.warn("Buffer full"))

The second parameter is optional; it is a callback invoked when the bounded buffer of 1,000 elements is full — when despite buffering Subscriber still cannot process events at a satisfying pace. It does not allow any recovery, so expect MissingBackpressureException immediately following the warning message. We do at least we have control over the buffer, but not the limits of the hardware or operating system.

An alternative to onBackpressureBuffer() is onBackpressureDrop(), which simply discards all events that appeared without prior request(). Imagine a waiter in a restaurant who keeps delivering new dishes to the kitchen. onBackpressureBuffer() is a finite/infinite table with dishes waiting to be washed. onBackpressureDrop(), on the other hand, is a waiter who simply throws away dirty dishes if there is no washing capacity at the moment. This is not a very sustainable business model, but at least the restaurant can keep serving clients:

1
.onBackpressureDrop(dish -> log.warn("Throw away {}", dish))

The callback is optional and it notifies us every time an event had to be discarded because it appeared without being requested. It is a good idea to keep track of how many events we dropped; this can be an important metric. Finally, there is onBackpressureLatest() which is quite similar to onBackpressureDrop(), but keeps a reference to the very last dropped element so that in case of a late request() from downstream, the last seen value from upstream is served.

The onBackpressure*() family of methods is used to bridge between operators and subscribers requesting backpressure and Observables that are not supporting it. However, it is better to either use or create sources that support it natively.

Honoring the Requested Amount of Data

There are many ways to construct an Observable that supports downstream backpressure requests. The easiest solution is to use built-in factory methods like range() or from(Iterable<T>). The latter creates a source backed by Iterable but with backpressure built-in. This means that such an Observable will not emit all values from Iterable at once; rather, it will do so gradually as requests are flowing from consumers. Note that this does not imply loading all data to List<T> (extending Iterable<T>) first. Iterable is basically a factory of Iterators, so we can safely load data on the fly.

An interesting example of a backpressure-enabled Observable is wrapping ResultSet from JDBC onto a stream. Notice that ResultSet is pull-based, just like the backpressure-enabled Observable. But it is not an Iterable or Iterator, so we must first convert it to Iterator<Object[]>—an Object[] is a loosely-typed representation of a single row from a database:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ResultSetIterator implements Iterator<Object[]> {
private final ResultSet rs;
public ResultSetIterator(ResultSet rs) {
this.rs = rs;
}
@Override
public boolean hasNext() {
return !rs.isLast();
}
@Override
public Object[] next() {
rs.next();
return toArray(rs);
}
}

The preceding converter is a very simplified version without error handling, extracted from ResultSetIterator, as found in Apache Commons DbUtils open source utility library. This class also provides a simplistic conversion to Iterable<Object[]>:

1
2
3
4
5
6
7
8
9
10
public static Iterable<Object[]> iterable(final ResultSet rs) {
return new Iterable<Object[]>() {
@Override
public Iterator<Object[]> iterator() {
return new ResultSetIterator(rs);
}
};
}

Having all of these converters in place, we can finally build Observable<Object[]> backed by ResultSet with backpressure support:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Connection connection = //...
PreparedStatement statement =
connection.prepareStatement("SELECT ...");
statement.setFetchSize(1000);
ResultSet rs = statement.executeQuery();
Observable<Object[]> result =
Observable
.from(ResultSetIterator.iterable(rs))
.doAfterTerminate(() -> {
try {
rs.close();
statement.close();
connection.close();
} catch (SQLException e) {
log.warn("Unable to close", e);
}
});

The result Observable supports backpressure out of the box because the built-in from() operator supports it. Therefore, the throughput of Subscriber is not relevant anymore and we will no longer see MissingBackpressureException. Notice that setFetchSize() is necessary; otherwise, some JDBC drivers might try to load all records into memory, quite inefficient if we want to stream over a large result set.

As we already mentioned, the low-level mechanism for supporting backpressure is a custom implementation of Producer. However, this task is quite error-prone, thus a helper class was created, namely SyncOnSubscribe. This implementation of Observable.OnSubscribe is pull-based and has backpressure transparently built in.

Let’s begin from the simplest case of stateless Observable — which is hardly ever found in real life. This type of Observable does not hold any state in between onNext() invocations. But even the simplest range() or just() must remember which items were already emitted. One of the few useful Observables without state emits random numbers:

1
2
3
4
5
6
Observable.OnSubscribe<Double> onSubscribe =
SyncOnSubscribe.createStateless(
observer -> observer.onNext(Math.random())
);
Observable<Double> rand = Observable.create(onSubscribe);

The rand Observable is an ordinary Observable that you can transform, combine, and subscribe to. But underneath, it has full-fledged backpressure support. If Subscriber or any other operator in the pipeline requests a limited number of events, this Observable will correctly obey the orders. The only thing we must provide to createStateless() is a lambda expression that is invoked for each requested event; so if downstream calls request(3), this custom expression is invoked three times, assuming that each invocation emits just one event. There is no context (state) in between invocations, thus it is called stateless.

Now let’s build a stateful operator. This variation of SyncOnSubscribe allows an immutable state variable that is passed between invocations. Also, each invocation must return a new state value. As an example, we will build an unbounded generator of natural numbers, beginning at zero. Such an operator is actually quite useful if you want to zip an arbitrarily long sequence with monotonically increasing natural numbers. range() will work as well, but it requires providing an upper limit, which is not always practical:

1
2
3
4
5
6
7
8
9
10
Observable.OnSubscribe<Long> onSubscribe =
SyncOnSubscribe.createStateful(
() -> 0L,
(cur, observer) -> {
observer.onNext(cur);
return cur + 1;
}
);
Observable<Long> naturals = Observable.create(onSubscribe);

This time we provide two lambda expressions to the createStateful() factory method. The first lazily creates initial state — zero in this case. The second expression is more important: it is supposed to push one item downstream somehow based on current state and return new state value. The state is expected to be immutable, thus this method allows returning a new state as opposed to mutating it. You can easily rewrite naturals Observable so that it returns BigInteger instead and prevents hypothetical overflow. This Observable can produce an infinite number of increasing natural numbers, but fully supports backpressure. This means that it can adjust the speed at which it produces events based on Subscribers preferences. Compare this to naive implementation that is undeniably much simpler, but falls short in the case of slow Subscribers:

1
2
3
4
5
6
7
Observable<Long> naturals = Observable.create(subscriber -> {
long cur = 0;
while (!subscriber.isUnsubscribed()) {
System.out.println("Produced: " + cur);
subscriber.onNext(cur++);
}
});

If you prefer a single state variable that mutates while you traverse it (like ResultSet from JDBC), SyncOnSubscribe has a method for you as well. The following code does not compile due to checked exceptions, but we want to first highlight the overall usage pattern:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ResultSet resultSet = //...
Observable.OnSubscribe<Object[]> onSubscribe = SyncOnSubscribe.createSingleState(
() -> resultSet,
(rs, observer) -> {
if (rs.next()) {
observer.onNext(toArray(rs));
} else {
observer.onCompleted();
}
observer.onNext(toArray(rs));
},
ResultSet::close
);
Observable<Object[]> records = Observable.create(onSubscribe);

There are three callbacks to implement:

  • Generator of state. This lambda is invoked once to produce state variable that will be passed as an argument to subsequent expressions
  • Callback to generate next value, typically based on state. This callback is free to mutate state given as the first argument
  • Third callback is invoked on unsubscription. This is the place to clean up ResultSet

The more complete implementation with error handling looks as follows. Note that errors occurring during unsubscription are really difficult to propagate properly downstream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.OnSubscribe<Object[]> onSubscribe = SyncOnSubscribe.createSingleState(
() -> resultSet,
(rs, observer) -> {
try {
rs.next();
observer.onNext(toArray(rs));
} catch (SQLException e) {
observer.onError(e);
}
},
rs -> {
try {
//Also close Statement, Connection, etc.
rs.close();
} catch (SQLException e) {
log.warn("Unable to close", e);
}
}
);

SyncOnSubscribe is a handy utility that allows you to write backpressure-enabled Observables. It is slightly more complex compared to Observable.create(), but the benefits of backpressure controlled by each Subscriber are difficult to underestimate. You should avoid using the create() operator directly and instead consider built-in factories like from() or SyncOnSubscribe.

Backpressure is an amazingly powerful mechanism for controlled throttling of Observables by Subscribers. The feedback channel obviously brings some overhead, but the advantages of loosely coupled yet managed producers and consumers are enormous. Backpressure is often batched, so the overhead is minimal, but if Subscriber is really slow (even briefly), this slowness is immediately reflected and the overall system stability is preserved. Missing backpressure can be mitigated to some extent by using the onBackpressure*() family of methods, but not on the long term.

When creating your Observables, think about correctly handling the backpressure requests. After all, you have no control over the throughput of Subscribers. Another technique is to avoid the heavyweight work in Subscriber, instead off-loading it to flatMap(). For example, rather than storing events in a database within subscribe() try doing this:

1
source.subscribe(this::store);

Consider making store more reactive (let it return Observable<UUID> of saved record) and subscribing only to trigger subscription and side-effects:

1
2
3
source
.flatMap(this::store)
.subscribe(uuid -> log.debug("Stored: {}", uuid));

Or even further, batch UUIDs to reduce logging framework overhead:

1
2
3
4
5
source
.flatMap(this::store)
.buffer(100)
.subscribe(
hundredUuids -> log.debug("Stored: {}", hundredUuids))

By avoiding long-running work in subscribe() we reduce the need for backpressure, but it is still a good idea to think about it in advance. Consult JavaDocs for an indication as to whether the operator supports backpressure or not. If such information is missing, most likely the operator is not affected by backpressure in any way, like map().

Summary

One important takeaway from this chapter is to avoid Observable.create() and manually emitting events. If you must implement Observable yourself, consider the many factory methods that support backpressure for you. Also, pay attention to your domain, maybe you can safely skip or batch incoming events to reduce overall load on the consuming side.