Reactive Programming with RxJava: Case Studies

This chapter shows examples of selected use cases of RxJava in real-life applications. The API of Reactive Extensions is very powerful but there must be a source of Observables somewhere. Creating an Observable from scratch can be challenging due to backpressure and the Rx contract, which must be followed. The good news is that there are many libraries and frameworks out there that support RxJava natively. Also RxJava turned out to be very useful on some platforms that are inherently asynchronous.

Throughout this chapter, you will see how RxJava improves the design and enhances the capabilities of existing architectures. We will also explore more complex topics that can arise when deploying reactive applications to production, such as memory leaks. When you’ve finished this chapter, you should be convinced that RxJava is mature and versatile enough to implement a variety of use cases in real, modern applications.

Android Development with RxJava

RxJava is very popular among Android developers. First, graphic user interfaces are inherently event driven, with events coming from various actions like key presses or mouse movements. Second, Android, just like Swing or many other GUI environments, is very unforgiving when it comes to threads. The main Android thread should not be blocked to avoid freezing the user interface; however, all updates to the user interface must happen in that main thread. These issues will be addressed in “Schedulers in Android”. But if there is just one thing you should try to learn about RxJava in Android, be sure to go through the next section that explains memory leaks and how to avoid them easily.

Avoiding Memory Leaks in Activities

One pitfall unique to Android is Activity-related memory leak. It happens when an Observer holds a strong reference to any GUI component that in turn references the entire parent Activity instance. When you rotate the screen of your mobile device or press the back button, the Android operating system destroys the current Activity and eventually tries to garbage collect it. Activities are fairly large objects, so eagerly cleaning them up is important. However if your Observer holds a reference to such an Activity, it might never be garbage-collected, leading to memory leak and device killing your application in its entirety. Take the following innocent code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MainActivity extends AppCompatActivity {
private final byte[] blob = new byte[32 * 1024 * 1024];
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
TextView text = (TextView) findViewById(R.id.textView);
Observable
.interval(100, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> {
text.setText(Long.toString(x));
});
}
}

The blob field is there just to speed up the memory-leak effects; imagine MainActivity being quite a complex tree of objects, instead. This simple application superficially looks fine. Every 100 milliseconds it updates a text field with the current counter value. But if you rotate your device a couple of times it crashes with OutOfMemoryError for some reason. Here is what happens:

  1. MainActivity is created, and during onCreate() we subscribe to interval()
  2. Every 100 milliseconds, we update text with the current counter value. Ignore mainThread() Scheduler for a second, it will be explained in “Schedulers in Android”
  3. The device changes orientation
  4. MainActivity is destroyed, a new one is created, and onCreate() is executed again
  5. We currently have two Observable.interval() running because we never unsubscribed from the first one

The fact that we have two intervals running at the same time, the first one being a leftover from the destroyed Activity is not the worst part. The interval() operator uses a background thread (via computation() Scheduler) to emit counter events. These events are subsequently propagated to Observer, one of them holding a reference to TextView which in turn holds a reference to old MainActivity.

The thread emitting interval() events becomes the new GC root; therefore, everything it references directly or indirectly is not eligible for garbage collection. That being said, even though the first instance of MainActivity was destroyed, it cannot be garbage-collected and the memory of our blob cannot be reclaimed. Every change of orientation (or whenever Android decides to destroy a particular Activity) increases memory leak.

The solution is simple: let interval() know when it is no longer needed by unsubscribing from it. Just like onCreate(), Android has a callback on destruction called onDestroy():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Subscription subscription;
@Override
protected void onCreate(Bundle savedInstanceState) {
//...
subscription = Observable
.interval(100, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> {
text.setText(Long.toString(x));
});
}
@Override
protected void onDestroy() {
super.onDestroy();
subscription.unsubscribe();
}

That is all there is to it. When an Observable is created as part of Activity’s lifecycle, make sure to unsusbcribe from it when the Activity is destroyed. Calling unsusbcribe() will detach Observer from Observable so that it is eligible for garbage collection. Together with Observer, the entire MainActivity can be collected, as well. Also the interval() itself will stop emitting events because no one is listening to them. Double win.

When you create multiple Observables together with some Activity, holding a reference to all Subscriptions can become tedious. A CompositeSubscription is a handy container in such cases. Each Subscription can simply be inserted into CompositeSubscription and on destruction we can unsubscribe all of them in one easy step:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private CompositeSubscription allSubscriptions = new CompositeSubscription();
@Override
protected void onCreate(Bundle savedInstanceState) {
//...
Subscription subscription = Observable
.interval(100, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> {
text.setText(Long.toString(x));
});
allSubscriptions.add(subscription);
}
@Override
protected void onDestroy() {
super.onDestroy();
allSubscriptions.unsubscribe();
}

It is worth mentioning that unsubscribing from an Observable that is no longer in use is a good practice in any environment. But on resource-constrained mobile devices, this becomes particularly important. Now that you are aware of the pitfalls of memory management on Android, it is time to redesign your mobile applications. First, we will explore Retrofit, an HTTP client with built-in RxJava support that is particularly popular on mobile environments.

Retrofit with Native RxJava Support

Retrofit is a popular library for making HTTP requests, especially in the Android ecosystem. It is neither Android-specific nor the only choice for an HTTP client. However, because it natively supports RxJava, it is a good choice for mobile applications, both written with RxJava in mind or only willing to properly handle HTTP code. The main advantage of using RxJava in network-related code is its ability to jump between threads easily.

Retrofit promotes a type-safe way of interacting with RESTful services by asking you to first declare a Java interface without implementation. This interface is later translated into an HTTP request transparently. For the purpose of the exercise, we will be interacting with Meetup API, a popular service for organizing events. One of the endpoints returns a list of cities near a given location:

1
2
3
4
5
6
7
8
public interface MeetupApi {
@GET("/2/cities")
Observable<Cities> listCities(
@Query("lat") double lat,
@Query("lon") double lon
);
}

Retrofit will translate the method call to listCities() into a network call. Under the hood, we will be making an HTTP GET request to /2/cities?lat=...&lon=... resource. Notice the return type. First, we have the strongly typed Cities rather than String or weakly typed map-of-maps. But more important, Cities comes from an Observable that will emit this object when a response arrives. Cities class maps most of the fields found in JSON received from the server, getters, and setters omitted:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Cities {
private List<City> results;
}
public class City {
private String city;
private String country;
private Double distance;
private Integer id;
private Double lat;
private String localizedCountryName;
private Double lon;
private Integer memberCount;
private Integer ranking;
private String zip;
}

Such an approach provides a good balance between abstraction (using high-level concepts like method calls and strongly-typed responses) and low-level details (asynchronous nature of network call). Although HTTP has request-response semantics, we model inevitable latency with Observable so that it is not hidden behind a leaky blocking RPC (remote procedure call) abstraction. Unfortunately, there is quite a bit of glue code that you must configure in order to interact with this particular API. Your mileage may vary, but it is important to see the steps required to properly parse the JSON response:

1
2
3
4
5
6
7
8
9
10
11
12
13
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
objectMapper.configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.meetup.com/")
.addCallAdapterFactory(
RxJavaCallAdapterFactory.create())
.addConverterFactory(
JacksonConverterFactory.create(objectMapper))
.build();

Having an instance of Retrofit, we can finally synthesize MeetupApi implementation to be used throughout the client code:

1
MeetupApi meetup = retrofit.create(MeetupApi.class);

At last, with our MeetupApi we can make some HTTP requests and use the power of RxJava. Let’s build a more comprehensive example. Using the Meetup API, we first grab a list of all cities and towns nearby a given location:

1
2
3
4
5
6
7
8
double warsawLat = 52.229841;
double warsawLon = 21.011736;
Observable<Cities> cities = meetup.listCities(warsawLat, warsawLon);
Observable<City> cityObs = cities
.concatMapIterable(Cities::getResults);
Observable<String> map = cityObs
.filter(city -> city.distanceTo(warsawLat, warsawLon) < 50)
.map(City::getCity);

First, we expand an Observable<Cities> with just one item into Observable<City> with one item per found city using concatMapIterable(). Then, we filter out only cities closer than 50 kilometers to the initial location. Finally, we extract a city name. Our next goal is to find the population of each city found in the vicinity of Warsaw to see how many people live within a radius of 50 kilometers. To achieve that, we must consult another API delivered by GeoNames. One method searches for location by a given name and, among other attributes, returns its population. We will again use Retrofit to connect to that API:

1
2
3
4
5
6
7
8
9
10
public interface GeoNames {
@GET("/searchJSON")
Observable<SearchResult> search(
@Query("q") String query,
@Query("maxRows") int maxRows,
@Query("style") String style,
@Query("username") String username);
}

A JSON object must be mapped to data objects (getters and setters omitted):

1
2
3
4
5
6
7
8
9
10
11
12
class SearchResult {
private List<Geoname> geonames = new ArrayList<>();
}
public class Geoname {
private String lat;
private String lng;
private Integer geonameId;
private Integer population;
private String countryCode;
private String name;
}

The way to instantiate GeoNames is similar to MeetupApi:

1
2
3
4
5
6
GeoNames geoNames = new Retrofit.Builder()
.baseUrl("http://api.geonames.org")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(JacksonConverterFactory.create(objectMapper))
.build()
.create(GeoNames.class);

Suddenly our sample application uses two different APIs and mashes them together very uniformly. For each city name, we would like to consult the GeoNames API and extract the population:

1
2
3
4
5
6
7
Observable<Long> totalPopulation = meetup
.listCities(warsawLat, warsawLon)
.concatMapIterable(Cities::getResults)
.filter(city -> city.distanceTo(warsawLat, warsawLon) < 50)
.map(City::getCity)
.flatMap(geoNames::populationOf)
.reduce(0L, (x, y) -> x + y);

If you think about it for a while, the preceding program is doing quite a lot of work in this concise form. First it asks MeetupApi for a list of cities and later for each city it fetches the population. Population responses (possibly coming asynchronously) are later totaled using reduce(). In the end, this whole computational pipeline ends up as Observable<Long>, emitting one long value whenever the population from all cities is accumulated.

This shows the true power of RxJava, how streams from different sources can be seamlessly combined. For example, the populationOf() method is actually quite a complex chain of operators making an HTTP request to GeoNames and extracting population by city name:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface GeoNames {
default Observable<Integer> populationOf(String query) {
return search(query)
.concatMapIterable(SearchResult::getGeonames)
.map(Geoname::getPopulation)
.filter(p -> p != null)
.singleOrDefault(0)
.doOnError(th ->
log.warn("Falling back to 0 for {}", query, th))
.onErrorReturn(th -> 0)
.subscribeOn(Schedulers.io());
}
default Observable<SearchResult> search(String query) {
return search(query, 1, "LONG", "some_user");
}
@GET("/searchJSON")
Observable<SearchResult> search(
@Query("q") String query,
@Query("maxRows") int maxRows,
@Query("style") String style,
@Query("username") String username
);
}

A generic search() method at the bottom is wrapped using default methods so that it is easier to use. After receiving a SearchResult object wrapped in JSON, we unwrap all individual search results, make sure the population was not absent in the response, and in case of any errors we simply return 0.

Finally, we make sure each population request is invoked on an io() scheduler to allow better concurrency. subscribeOn() is actually crucial here. Without it, every request for population for each city would be sequential, drastically increasing the overall latency. However, because for each city flatMap() will invoke the populationOf() method and subscribe to it when needed, data about each city is fetched concurrently.

In fact, we can also add a timeout() operator to each population request, as well, to achieve an even better response time at the cost of incomplete data. Without RxJava, implementing this scenario would require a lot of manual thread-pool integration. Even with CompletableFuture the task is nontrivial. Yet RxJava with noninvasive concurrency and powerful operators make it possible to write both fast and easy to understand, concise code.

Combining two different APIs, both driven by Retrofit, works like a charm. However, there is nothing that prevents us from combining entirely unrelated Observables; for example, one coming from Retrofit, another from a JDBC call, and yet another one receiving a JMS message. All these use cases are fairly easy to implement, neither leaking the abstraction nor giving too many details about the nature of the underlying stream implementation.

Schedulers in Android

One of the very first mistakes that every Android developer makes is blocking the UI thread. On Android there is one designated main thread that interacts bi-directionally with the user interface (UI). Callbacks from native widgets invoke our handlers on main thread but also widget updates (changing labels, drawing) must occur within that thread. This restriction greatly simplifies the UI internal architecture but also has serious downsides:

  • Attempting any time-consuming operation (typically blocking network call) within callback handling, a UI event will prevent other events from being handled, causing the UI to freeze. Eventually, the operating system will kill such misbehaving applications
  • Updating the UI — for example, when a blocking network call completed — must occur on the main thread. We must somehow ask the operating system to invoke updating code within that main thread

Amazingly, RxJava has two built-in mechanisms for that. You can run side-effecting tasks in the background using subscribeOn(), whereas jumping back to the main thread is easy with observeOn(). All you need is a special Scheduler that is aware of the Android environment and its main thread.

1
compile 'io.reactivex:rxandroid:1.1.0'

This small library will add the AndroidSchedulers class to your CLASSPATH, which is essential for writing concurrent code on Android with RxJava. Using the AndroidSchedulers is best explained by means of an example. We would like to make a call to the Meetup API, fetch a list of cities nearby a given location, and then display them:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
meetup
.listCities(52.229841, 21.011736)
.concatMapIterable(extractCities())
.map(toCityName())
.toList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
putOnListView(),
displayError());
}
});

On vanilla Android, all transformations and callbacks look as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//Cities::getResults
Func1<Cities, Iterable<City>> extractCities() {
return new Func1<Cities, Iterable<City>>() {
@Override
public Iterable<City> call(Cities cities) {
return cities.getResults();
}
};
}
//City::getCity
Func1<City, String> toCityName() {
return new Func1<City, String>() {
@Override
public String call(City city) {
return city.getCity();
}
};
}
//cities -> listView.setAdapter(...)
Action1<List<String>> putOnListView() {
return new Action1<List<String>>() {
@Override
public void call(List<String> cities) {
listView.setAdapter(
new ArrayAdapter(MainActivity.this, R.layout.list, cities));
}
};
}
//throwable -> {...}
Action1<Throwable> displayError() {
return new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "Error", throwable);
Toast.makeText(MainActivity.this,
"Unable to load cities",
Toast.LENGTH_SHORT)
.show();
}
};
}

Here is what happens. When a button is clicked, we make an HTTP request via Retrofit. Retrofit produces an Observable<Cities> that we further transform by extracting only relevant information. We end up with List<String> representing nearby cities. This list is eventually displayed on screen.

The use of two schedulers is actually crucial. Without subscribeOn(), Retrofit will use a caller thread to make an HTTP call, causing Observable to become blocking. This means that the HTTP request will attempt to block the main Android thread, which is immediately picked up by an operating system and fails with NetworkOnMainThreadException. The traditional way of running network code in the background is by either creating a new Thread or using AsyncTask. The advantages of subscribeOn() are obvious: code is much cleaner, less invasive, and has built-in declarative error handling via onError notification.

The observeOn() invocation is equally important. When all transformations are done, we invoke a UI update only on the main thread because we want to carry out as little processing as possible there. Without observeOn() that shifts execution to mainThread() our Observable would attempt updating listView from a background thread, which fails immediately with CalledFromWrongThreadException. Again, observeOn() is much more convenient than postDelayed() from the android.os.Handler class (that AndroidSchedulers.mainThread() uses under the hood).

Flexibility of schedulers combined with the API simplicity is very compelling to many Android developers. RxJava offers a simpler, cleaner but also safer way of tackling the complexity of concurrent programming on mobile devices.

NOTE:

The preceding example has one major flaw that can lead to memory leak. The Observer keeps a reference to the enclosing Android Activity and can outlive it. This problem was explained and dealt with in “Avoiding Memory Leaks in Activities”.

UI Events as Streams

From the syntax level, RxJava aims to avoid callback hell by replacing nested callbacks with declarative transformations. Therefore, setOnClickListener() enclosing Observable looked a bit disturbing. Fortunately, there is a library that translates Android UI events into streams. Simply add the following dependency to your project:

1
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'

From this point, we can replace an imperative callback registration with a handy pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RxView
.clicks(button)
.flatMap(listCities(52.229841, 21.011736))
.delay(2, TimeUnit.SECONDS)
.concatMapIterable(extractCities())
.map(toCityName())
.toList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
putOnListView(),
displayError());
Func1<Void, Observable<Cities>> listCities(final double lat, final double lon) {
return new Func1<Void, Observable<Cities>>() {
@Override
public Observable<Cities> call(Void aVoid) {
return meetup.listCities(lat, lon);
}
};
}

Rather than registering a callback that creates and transforms Observable locally, we begin with Observable<Void> representing button clicks. Clicking a button does not convey any information; thus, it is Void. Each click event triggers an asynchronous HTTP request returning Observable<Cities>. Everything else stays pretty much the same. If you think this is just a minor readability improvement, consider composing multiple GUI event streams.

Imagine that you have two text fields; one for entering latitude and another one for longitude. Any time either of them changes, you would like to make an HTTP request looking for all cities nearby that location. However, to avoid unnecessary network traffic when the user is still typing, we want to implement a certain delay. The network request is initiated only when no changes occurred to any text field for one second. This is very similar to autocomplete text fields that have a slight delay to avoid excessive network usage, but in this case we have to take two inputs together into account. The implementation using RxJava and RxBinding is very elegant:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
EditText latText = //...
EditText lonText = //...
Observable<Double> latChanges = RxTextView
.afterTextChangeEvents(latText)
.flatMap(toDouble());
Observable<Double> lonChanges = RxTextView
.afterTextChangeEvents(lonText)
.flatMap(toDouble());
Observable<Cities> cities = Observable
.combineLatest(latChanges, lonChanges, toPair())
.debounce(1, TimeUnit.SECONDS)
.flatMap(listCitiesNear());

nd all transformations (note how verbose the code is when lambda expressions are not an option):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Func1<TextViewAfterTextChangeEvent, Observable<Double>> toDouble() {
return new Func1<TextViewAfterTextChangeEvent, Observable<Double>>() {
@Override
public Observable<Double> call(TextViewAfterTextChangeEvent e) {
String s = e.editable().toString();
try {
return Observable.just(Double.parseDouble(s));
} catch (NumberFormatException e) {
return Observable.empty();
}
}
};
}
//return Pair::new
Func2<Double, Double, Pair<Double, Double>> toPair() {
return new Func2<Double, Double, Pair<Double, Double>>() {
@Override
public Pair<Double, Double> call(Double lat, Double lon) {
return new Pair<>(lat, lon);
}
};
}
//return latLon -> meetup.listCities(latLon.first, latLon.second)
Func1<Pair<Double, Double>, Observable<Cities>> listCitiesNear() {
return new Func1<Pair<Double, Double>, Observable<Cities>>() {
@Override
public Observable<Cities> call(Pair<Double, Double> latLon) {
return meetup.listCities(latLon.first, latLon.second);
}
};
}

First, RxTextView.afterTextChangeEvents() transforms the imperative callbacks invoked by EditText whenever the content changes. We create two such streams for latitude and longitude separately. On the fly, we transform TextViewAfterTextChangeEvent into a double, silently dropping the malformed inputs.

Having two streams of doubles, we combine them using combineLatest() so that we receive a stream of pairs every time either of the inputs change. The final piece is debounce(), which waits one second before forwarding such pairs just in case another edit (either of latitude or longitude) follows shortly. Thanks to debounce(), we avoid unnecessary network calls while the user is typing. The rest of the application remains the same.

This example nicely shows how reactive programming propagates up from Retrofit to user components so that everything in the application becomes a composition of streams. Just make sure that you unsubscribe from afterTextChangeEvents(); failing to do so can lead to memory leak.

Querying NoSQL Databases

A typical application these days has two high-latency origins of data: network calls (mostly HTTP) and database queries. Retrofit is a fantastic source of Observables that are backed by an asynchronous HTTP call. When it comes to database access, we spent quite some time looking at SQL databases that are historically blocking due to the JDBC API design.

NoSQL databases are more modern in that regard and often provide asynchronous, nonblocking client drivers. In this chapter, we will briefly explore Couchbase and MongoDB drivers that have native RxJava support and can return Observable for each external call.

Couchbase Client API

Couchbase Server is a modern document database in the NoSQL family. What is interesting is that Couchbase supports RxJava as first-class citizen in its client API. Reactive extensions are not only used as a wrapper but are officially supported and idiomatic when interacting with the database. Many other storage engines have a nonblocking, asynchronous API but the creators of Couchbase chose RxJava as the best foundation for the client layer.

As an example, let’s query the example dataset called travel-sample, which happens to have a document for ID route_14197. In a sample dataset, the route document looks as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"id": 14197,
"type": "route",
"airline": "B6",
"airlineid": "airline_3029",
"sourceairport": "PHX",
"destinationairport": "BOS",
"stops": 0,
"equipment": "320",
"schedule": [
{
"day": 0,
"utc": "22:12:00",
"flight": "B6928"
},
{
"day": 0,
"utc": "06:40:00",
"flight": "B6387"
},
...
{
"day": 1,
"utc": "08:16:00",
"flight": "B6922"
}
...

Every query returns an Observable, and from this point, we can safely transform retrieved records in whatever way we find suitable:

1
2
3
4
5
6
7
8
9
10
11
CouchbaseCluster cluster = CouchbaseCluster.create();
cluster
.openBucket("travel-sample")
.get("route_14197")
.map(AbstractDocument::content)
.map(json -> json.getArray("schedule"))
.concatMapIterable(JsonArray::toList)
.cast(Map.class)
.filter(m -> ((Number)m.get("day")).intValue() == 0)
.map(m -> m.get("flight").toString())
.subscribe(flight -> System.out.println(flight));

An AsyncBucket.get() returns an Observable<JsonDocument>. JSON documents are inherently loosely typed so in order to extract meaningful information we must traverse them with prior knowledge of their structure. Knowing what the document looks like in advance, it is easy to understand transformations on JsonDocument. Amazingly, RxJava works equally good for the following:

  • Data retrieval, including timeouts, caching, and error handling
  • Data transformation, like extracting, filtering, drilling down into data, and aggregating

This shows the power of the Observable abstraction that you can use in very different scenarios while still exposing the same concise API.

MongoDB Client API

Just like Couchbase, MongoDB allows storing arbitrary JSON-like documents without any predefined schema. The client library has first-class support for RxJava allowing both asynchronous storing and querying of data. The following example does both of these. It first inserts 12 documents into the database; as soon as the batch insert is done, it queries them back:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.mongodb.rx.client.*;
import org.bson.Document;
import java.time.Month;
MongoCollection<Document> monthsColl = MongoClients
.create()
.getDatabase("rx")
.getCollection("months");
Observable
.from(Month.values())
.map(month -> new Document()
.append("name", month.name())
.append("days_not_leap", month.length(false))
.append("days_leap", month.length(true))
)
.toList()
.flatMap(monthsColl::insertMany)
.flatMap(s -> monthsColl.find().toObservable())
.toBlocking()
.subscribe(System.out::println);

The Month class is an enum having values from January to December. Also, we can easily obtain any month’s length in both leap and nonleap years. First, we create twelve BSON (binary JSON) documents, each representing one month with its length. Then we batch insert List<Document> using insertMany() in MongoCollection. This yields an Observable<Success> (the value itself does not contain any meaningful information; it is a singleton). When the Success event appears, we can query the database by calling find().toObservable(). Hopefully, the 12 documents we just inserted are found. Excluding the automatically assigned _id property for clarity, this is what is printed at the very end:

1
2
3
4
Document{{name=JANUARY, days_not_leap=31, days_leap=31}}
Document{{name=FEBRUARY, days_not_leap=28, days_leap=29}}
Document{{name=MARCH, days_not_leap=31, days_leap=31}}
...

Again, the true power comes from composition. With MongoDB’s RxJava driver, you can easily query multiple collections at the same time and achieve concurrency without really thinking about it much. The code snippet that follows makes two concurrent requests to MongoDB and another one to some pricing service. Note that first() is not an operator on Observable; rather, it is a MongoDB operator that returns an Observable after constructing a query. find() is equivalent to the WHERE clause in SQL, whereas projection() represent SELECT. first() is like LIMIT 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable<Integer> days = db.getCollection("months")
.find(Filters.eq("name", APRIL.name()))
.projection(Projections.include("days_not_leap"))
.first()
.map(doc -> doc.getInteger("days_not_leap"));
Observable<Instant> carManufactured = db.getCollection("cars")
.find(Filters.eq("owner.name", "Smith"))
.first()
.map(doc -> doc.getDate("manufactured"))
.map(Date::toInstant);
Observable<BigDecimal> pricePerDay = dailyPrice(LocalDateTime.now());
Observable<Insurance> insurance = Observable
.zip(days, carManufactured, pricePerDay,
(d, man, price) -> {
//Create insurance
});

Technically, you can mix and match any Observables, irrespective of their nature and source. The preceding example makes two queries to MongoDB to two different collections and another query in dailyPrice() that can — for example, return an Observable from Retrofit making an HTTP call.

The bottom line is this: the source of Observable is irrelevant, you can compose asynchronous computations and requests any way you like. Do you plan on querying multiple databases combined with web services and local file system operation? All of these can run concurrently and be composed together with the same ease. After you grasp how RxJava behaves in general, every source of Observable is the same on the surface.

Java 8 Streams and CompletableFuture

Sometimes there is a confusion as to which abstraction to use for concurrent programming, especially since Java 8. There are a few competing APIs that allow you to express asynchronous computation in a clean way. This section compares all of them to help you choose the right tool for the job. The available abstractions include the following:

CompletableFuture

CompletableFuture introduced in Java 8 is a much more powerful extension to the well-recognized Future from the java.util.concurrent package. CompletableFuture allows registering an asynchronous callback when Future completes or fails rather than blocking and waiting for the result. But the true strength comes from the composition and transformation capabilities, similar to what Observable.map() and flatMap() offer. Despite being introduced in standard JDK, not a single class in standard Java library depends or uses CompletableFuture. It is perfectly usable but not very well integrated into the Java ecosystem.

Parallel Stream

Just like CompletableFutures, streams in java.util.stream were introduced in JDK 8. Streams are a way to declare a sequence of operations like mapping, filtering, and so on prior to execution. All operations on a stream are lazy until a terminal operation is used, like collect() or reduce(). Also JDK can automatically parallelize some operations on all available cores, which sounds very compelling. Parallel streams promise transparent mapping, filtering, or even sorting of large datasets on multiple cores. Streams are typically generated from a collection but can just as well be created on the fly and infinite.

rx.Observable

An Observable represents a stream of events appearing in unpredictable moments in time. It can represent zero, one, fixed, or infinite number of events, available immediately or over time. Observable can terminate with completion or error event. You should be fairly comfortable with what Observable is by now.

rx.Single

When RxJava matured it became apparent that a specialized type that represents exactly one result is beneficial. The Single type is a stream that either completes with exactly one value or with an error. In that sense, it is much like CompletableFuture, but Singles are lazy, meaning that they do not begin computation until subscribed.

rx.Completable

Sometimes we invoke a certain computation purely for side effects, not expecting any result. Sending an email or storing a record in a database are examples of such operations that involve I/O (this can benefit from asynchronous processing) but do not return any meaningful result. Traditionally, CompletableFuture<Void> or Observable<Void> was used in such scenarios. However, the even more specific Completable type better expresses the intent of asynchronous computation without result. Completable can notify about completion or error in concurrent execution and just like all other Rx types, it is lazy.

Obviously, there are other ways of expressing asynchronous computation, such as the following:

  • Flux and Mono from project Reactor. These types are somewhat similar to Observable and Single, respectively
  • ListenableFuture from Guava

However, we will keep our list of choices short by limiting it to JDK and RxJava. Before we continue, let me state that if your application already uses CompletableFuture rather consistently, you should probably stick to it. Some APIs provided by CompletableFuture are a bit awkward, but in general this class delivers quite good support for reactive programming. Moreover, we can expect more and more frameworks to take advantage and idiomatically support it. Supporting RxJava in third-party libraries is more difficult because it requires additional dependency, whereas CompletableFuture is part of JDK.

Usefulness of Parallel Streams

Let’s shift for a moment and discuss parallel streams from the standard JDK. In Java 8, when you transform a moderately big collection of objects you can transform them declaratively with optional parallelism:

1
2
3
4
5
6
7
8
List<Person> people = //...
List<String> sorted = people
.parallelStream()
.filter(p -> p.getAge() >= 18)
.map(Person::getFirstName)
.sorted(Comparator.comparing(String::toLowerCase))
.collect(toList());

Notice the parallelStream() rather than conventional stream() in the preceding code snippet. By using parallelStream(), we ask for terminal operation like collect() to be performed in parallel rather than sequentially. Of course, this should not have any impact on the result but is supposed to be much faster. Under the hood, what parallelStream() does is split an input collection into multiple chunks, invoke operations on each one of them in parallel, and then combine the results in a divide-and-conquer spirit.

Many operators are very straightforward to parallelize — for example, map() and filter() — others are a bit more difficult (like sorted()) because after sorting every chunk separately we must combine them together, which in the case of sorting means merging two sorted sequences. Some operations are inherently difficult or impossible to parallelize without further assumptions. For example, reduce() can be performed only if the accumulating function is associative.

Ideally, taking Amdahl’s law into account on a four-CPU machine, we can expect up to four times faster execution. But parallel streams have their drawbacks. To begin with, for small streams and short pipelines of transformations the cost of context switching can be significant to the point at which parallel streams are slower than their sequential counterparts. The problem of too fine-grained a concurrency can potentially occur in RxJava as well, therefore it supports declarative concurrency via Schedulers. The situation with parallel streams is different.

Ever wondered why this framework is called parallel and not concurrent streams? Parallel streams were only designed for CPU-intensive work and have a hardcoded thread pool (ForkJoinPool, to be precise) that is aligned with the number of CPUs we have. This pool is available statically and globally under ForkJoinPool.commonPool().

Every parallel stream, as well as some CompletableFuture callbacks within JVM share this ForkJoinPool. All parallel streams in the entire JVM (so in multiple applications if you are deploying WAR files onto application server) share the same small pool. This is generally fine because parallel streams were designed for parallel tasks, which really need the CPU 100% of the time. Thus, if multiple parallel streams are invoked concurrently they do compete for CPU, no matter what.

But imagine one selfish application running an I/O operation within a parallel stream:

1
2
3
4
//DON'T DO THIS
people
.parallelStream()
.forEach(this::publishOverJms);

publishOverJms() sends a JMS message for each person in a stream. We intentionally chose JMS sending. It seems fast, but due to delivery guarantees a JMS send will most likely touch either network (to notify message broker) or disk (to persist message locally). This tiny amount of I/O latency is enough to hold precious ForkJoinPool.commonPool() threads for an excessively long time. Even though this program is not using CPU, no other code within JVM is allowed to execute parallel stream. Now imagine if this were not sending over JMS but retrieving data from web service or making an expensive database query. parallelStream() can only ever be used for entirely CPU-bound tasks, otherwise the performance of the JVM takes a significant hit.

This does not imply that parallel streams are bad. However, due to the fixed thread pool powering them they are of very limited use. Certainly, parallel streams from JDK are not a replacement for Observable.flatMap() or other concurrency mechanisms. Parallel streams work best when executed, well… in parallel. But concurrent tasks that do not require the CPU 100% of the time — for example, being blocked on network or disk — are better off using other mechanisms.

Knowing the limitations of streams lets us compare futures and RxJava to see where they fit best.

Choosing the Appropriate Concurrency Abstraction

The closest equivalent to CompletableFuture in RxJava is Single. You can also use Observable, keeping in mind that it can emit any number of values. One big difference between futures and RxJava types is the laziness of the latter. When you have a reference to CompletableFuture, you can be sure that background computation already began, whereas Single and Observable will most likely begin to work only when you subscribe to them. Knowing this semantic discrepancy, you can fairly easily interchange CompletableFuture with Observable and Single.

For rare cases in which the result of asynchronous computation is unavailable or irrelevant, CompletableFuture<Void> or Observable<Void> was used. Whereas the former is quite straightforward, the latter might suggest a potentially infinite stream of empty events, whatever that means. Using rx.Single<Void> sounds as bad as a future of Void. Thus, rx.Completable was introduced. Use Completable when your architecture has many operations that have no meaningful result (but might result in an exception). One example of such architecture is command-query separation (CQS) wherein commands are asynchronous and by definition have no result.

When to Choose Observable?

When your application deals with a stream of events appearing over time (e.g., user logins, GUI events, and push notifications), Observable is unbeatable. We never mentioned it, but since version 1.0, Java has offered java.util.Observable, which allows registering Observers and notifying them at the same time. Yet it lacks the following:

  • Composition capabilities (no operators)
  • Generics (Observer has one update() method taking the Object representing an arbitrary notification payload)
  • Performance (synchronized keyword used everywhere, java.util.Vector internally)
  • Separation of concerns (in some sense, it combines Observable and PublishSubject under the same interface)
  • Concurrency support (all observers are notified sequentially)
  • Immutability

Observable from JDK is the best of what we can get in standard Java for declarative modeling of events, right after addListener() methods in the GUI packages. If your domain explicitly mentions events or flow of data, rx.Observable<T> is hard to beat. The declarative expressiveness combined with a broad family of operators can solve many of the problems you can come across. For cold Observables, you can take advantage of backpressure to control the throughput, whereas in case of hot Observables, you can use many flow control operators like buffer().

Memory Consumption and Leaks

RxJava is all about streams of events being processed in memory and on the fly. It provides a consistent, rich API abstracting away the details of the event source. Ideally, we should keep only a very limited, fixed set of events in memory, between the producer emitting events and the consumer storing them or forwarding to another component.

In reality, some components, especially when misused, can consume an unlimited amount of memory. Obviously, memory is limited and we will eventually encounter either OutOfMemoryError or a never-ending garbage collection loop. This sections shows you a few examples of uncontrolled consumption and memory leaks in RxJava and how to prevent them.

Operators Consuming Uncontrolled Amounts of Memory

There are operators that can consume any amount of memory depending only on the nature of your stream. We will look at just few of them and try to take some safety measures to avoid leaks.

DISTINCT() CACHING ALL SEEN EVENTS

For example, distinct(), by definition, must store all encountered keys since the subscription. The default overload of distinct() compares all seen events so far with an internal cache set. If the same event (with respect to equals()) did not appear yet in the stream, it is emitted and added to the cache for the future. This cache is never evicted to guarantee that the same event never again appears. You can easily imagine that if events are fairly big or frequent, this internal cache will just keep growing, leading to memory leak.

For the purpose of this demonstration, we will use the following event simulating a big chunk of data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Picture {
private final byte[] blob = new byte[128 * 1024];
private final long tag;
Picture(long tag) { this.tag = tag; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Picture)) return false;
Picture picture = (Picture) o;
return tag == picture.tag;
}
@Override
public int hashCode() {
return (int) (tag ^ (tag >>> 32));
}
@Override
public String toString() {
return Long.toString(tag);
}
}

The following program is executed against a very memory constraint environment (-mx32M: 32 MB of heap), emitting fairly large events as fast as it can:

1
2
3
4
5
6
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.distinct()
.sample(1, TimeUnit.SECONDS)
.subscribe(System.out::println);

After running this, OutOfMemoryError appears very quickly because the internal cache of distinct() can no longer hold more Picture instances. The CPU usage shortly before crash is also quite severe due to the garbage collector being determined to free some space. But even if rather than using the entire Picture as a key used to distinguish events we use only Picture.tag the program still crashes, only much later:

This type of leak is even more dangerous. The problem slowly escalates without us noticing, until it finally explodes in the least expected moment, often under high load. To prove that distinct() is the root of memory leak, check out a similar program that does not use distinct() but instead counts how many events were emitted per second without any buffering. Your mileage may vary, but you can expect hundreds of thousands of large messages per second processed without much pressure on garbage collection or memory:

1
2
3
4
5
6
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.subscribe(System.out::println);

So how do you avoid memory leaks related to distinct()?

  • Avoid distinct() altogether. As simple as that, this operator is inherently dangerous when used incorrectly
  • Choose your key wisely. Ideally it should have finite and small value space. Enum and byte are OK, long or String probably not. If you cannot prove that a given type will only ever have very limited number of values (like enum) you are risking memory leak
  • Consider distinctUntilChanged() instead, which keeps track of only the last seen event, not all of them
  • Do you really need uniqueness from the very beginning or can you maybe relax this requirement? Maybe you somehow know that duplicates can ever appear only within 10 seconds of one another? Then consider running distinct() on a small window:
1
2
3
4
5
Observable
.range(0, Integer.MAX_VALUE)
.map(Picture::new)
.window(10, TimeUnit.SECONDS)
.flatMap(Observable::distinct);

Every 10 seconds we start a new window and ensure that there are no duplicates within that window. The window() operator emits an Observable of all events that occurred within each time window. Unique (with respect to distinct()) values in that window are immediately emitted. When the 10-second window is over, a new window starts, but more importantly, the cache associated with the old window is garbage-collected.

Of course, within these 10 seconds we can still have a critical number of events causing OutOfMemoryError, so it is better to use a window of fixed length (e.g., window(1000)) rather than fixed time. Also, if nondistinct events appeared unfortunately right at the end of one window and at the beginning of the next window, we will not discover a duplicate. This is a trade-off of which you must be aware.

BUFFERING EVENTS WITH TOLIST() AND BUFFER()

The fact that toList() can consume an unlimited amount of memory is quite obvious. Moreover, using toList() for infinite streams makes no sense. toList() emits just one item on completion of upstream source — when the completion is not expected, toList() will never emit anything. But it will continue to aggregate all events in memory. Using toList() for very long streams is also questionable. You should find a way of consuming the events on the fly or at least limiting the number of upstream events using operators like take().

toList() makes sense when you need to look at all events of finite Observable at the same time. This is rarely the case, you can apply predicates (like allMatch() and anyMatch()), count items (count()), or reduce them to single aggregate value (reduce()) without ever needing all events in memory at the same time. One use case could be transforming an Observable<Observable<T>> into Observable<List<T>> where the inner Observable has known fixed length:

1
2
.window(100)
.flatMap(Observable::toList)

This is equivalent to the following:

1
.buffer(100)

Which brings us to buffer(). Before using buffer(), think deeply if you really need to have a List<T> of all events within a time frame. Maybe an Observable<T> is enough, for example, suppose that you need to know whether there were more than five incidents of high priority in each second having an Observable<Incident>. You want to produce an Observable<Boolean> that every second either emits true if a large number of high priority incidents occurred within that second, or false otherwise. With buffer(), this is quite straightforward:

1
2
3
4
5
6
7
8
Observable<Incident> incidents = //...
Observable<Boolean> danger = incidents
.buffer(1, TimeUnit.SECONDS)
.map((List<Incident> oneSecond) -> oneSecond
.stream()
.filter(Incident::isHIghPriority)
.count() > 5);

However, window() does not require buffering events into intermediate List but forwards them on the fly. window() is equally convenient for the same task but keeps constant memory usage.

1
2
3
4
5
6
7
8
Observable<Boolean> danger = incidents
.window(1, TimeUnit.SECONDS)
.flatMap((Observable<Incident> oneSecond) ->
oneSecond
.filter(Incident::isHIghPriority)
.count()
.map(c -> (c > 5))
);

Observable actually has much richer API compared to Stream from the JDK so you might find yourself converting a Java Collection to Observable just for the sake of better operators. For example, streams do not have support for a sliding window or zipping.

That being said, you should prefer window() over buffer() when possible, especially when the size of internal List accumulated in buffer() is impossible to predict and manage.

CACHING WITH CACHE() AND REPLAYSUBJECT

The cache() operator is another obvious memory consumer. Even worse than distinct(), cache() keeps a reference to every single event that it ever received from upstream. It makes sense to use cache() for Observables that are known to have fixed, short length. For example, when Observable is used to model an asynchronous response of some component, using cache() is safe and desirable. Otherwise, each Observer will trigger the request again, potentially leading to unanticipated side effects. Conversely, caching long, possibly infinite Observables, especially hot ones, makes very little sense. In the case of hot Observables, you are probably not interested in historic events anyway.

The same story goes for ReplaySubject. Everything you place in such a Subject must be stored so that subsequent Observers get all notifications, not only the future ones. The suggestions for both cache() and ReplaySubject are pretty much the same. If you find yourself using them, it is up to you to guarantee that the source you are caching is finite and relatively short. Also if possible try not to keep a reference to a cached Observable for too long, so that it can be garbage-collected after a while.

BACKPRESSURE KEEPS MEMORY USAGE LOW

If you try to zip two sources, one of which is even slightly slower than the other, zip()/zipWith() operators must temporarily buffer the faster stream while waiting for corresponding events from the slower one:

1
2
3
4
5
6
7
8
9
Observable<Picture> fast = Observable
.interval(10, MICROSECONDS)
.map(Picture::new);
Observable<Picture> slow = Observable
.interval(11, MICROSECONDS)
.map(Picture::new);
Observable
.zip(fast, slow, (f, s) -> f + " : " + s)

You might expect this code to eventually crash with OutOfMemoryError because zip() supposedly keeps its ever-growing buffer of events from fast, waiting for the slow stream. But this is not the case; in fact, we almost immediately get the dreadful MissingBackpressureException. The zip() (and zipWith()) operator does not blindly receive events at whatever throughput the upstream dictates. Instead, these operators take advantage of backpressure and only request as little data as possible. Therefore, if upstream Observables are cold and implemented properly, zip() will simply slow down the faster Observable by requesting less data than it could technically produce.

In the case of interval(), though, the mechanism does not work this way. The interval() operator is cold because it starts the counter only when someone subscribes and each Observer gets its own independent stream. Yet, after we already subscribed to interval(), there is no way of slowing it down, by definition it must emit events at a certain frequency. Therefore, it must ignore backpressure requests and possibly lead to MissingBackpressureException. All we can do is drop the excess events:

1
2
3
4
5
Observable
.zip(
fast.onBackpressureDrop(),
slow.onBackpressureDrop(),
(f, s) -> f + " : " + s)

But in case of MissingBackpressureException, how is it better than OutOfMemoryError? Well, missing backpressure fails fast, whereas out of memory can build up slowly, consuming precious memory that could have been allocated elsewhere. But missing backpressure can also occur in the least expected moment—for example, when garbage collection happens.

Summary

It is much easier to begin with RxJava when some source of Observables appears in our codebase. Implementing a new Observable from scratch is error-prone, so when various libraries have native RxJava support, it is much easier to begin. In “From Collections to Observables” we slowly refactored existing application from imperative, collection-oriented style to stream-oriented, declarative approach. But after you introduce libraries that are sources of asynchronous Observables, the refactoring becomes much easier. The more streams you have in your application, the more reactive API propagates up. It begins at the data-acquisition level (database, web service and so on) and bubbles to service and web layer. Suddenly our entire stack is written reactively. At some point, when the usage of RxJava reaches a certain critical point in the project, there is no longer a need for toBlocking(), because everything is a stream, top to bottom.