This chapter shows examples of selected use cases of RxJava in real-life applications. The API of Reactive Extensions is very powerful but there must be a source of
Observables somewhere. Creating an
Observable from scratch can be challenging due to backpressure and the Rx contract, which must be followed. The good news is that there are many libraries and frameworks out there that support RxJava natively. Also RxJava turned out to be very useful on some platforms that are inherently asynchronous.
Throughout this chapter, you will see how RxJava improves the design and enhances the capabilities of existing architectures. We will also explore more complex topics that can arise when deploying reactive applications to production, such as memory leaks. When you’ve finished this chapter, you should be convinced that RxJava is mature and versatile enough to implement a variety of use cases in real, modern applications.
RxJava is very popular among Android developers. First, graphic user interfaces are inherently event driven, with events coming from various actions like key presses or mouse movements. Second, Android, just like Swing or many other GUI environments, is very unforgiving when it comes to threads. The main Android thread should not be blocked to avoid freezing the user interface; however, all updates to the user interface must happen in that main thread. These issues will be addressed in “Schedulers in Android”. But if there is just one thing you should try to learn about RxJava in Android, be sure to go through the next section that explains memory leaks and how to avoid them easily.
One pitfall unique to Android is
Activity-related memory leak. It happens when an Observer holds a strong reference to any GUI component that in turn references the entire parent
Activity instance. When you rotate the screen of your mobile device or press the back button, the Android operating system destroys the current
Activity and eventually tries to garbage collect it. Activities are fairly large objects, so eagerly cleaning them up is important. However if your
Observer holds a reference to such an
Activity, it might never be garbage-collected, leading to memory leak and device killing your application in its entirety. Take the following innocent code:
The blob field is there just to speed up the memory-leak effects; imagine MainActivity being quite a complex tree of objects, instead. This simple application superficially looks fine. Every 100 milliseconds it updates a text field with the current counter value. But if you rotate your device a couple of times it crashes with
OutOfMemoryError for some reason. Here is what happens:
MainActivityis created, and during
onCreate()we subscribe to
- Every 100 milliseconds, we update text with the current counter value. Ignore
mainThread() Schedulerfor a second, it will be explained in “Schedulers in Android”
- The device changes orientation
MainActivityis destroyed, a new one is created, and
onCreate()is executed again
- We currently have two
Observable.interval()running because we never unsubscribed from the first one
The fact that we have two intervals running at the same time, the first one being a leftover from the destroyed
Activity is not the worst part. The
interval() operator uses a background thread (via
computation() Scheduler) to emit counter events. These events are subsequently propagated to
Observer, one of them holding a reference to
TextView which in turn holds a reference to old
The thread emitting
interval() events becomes the new GC root; therefore, everything it references directly or indirectly is not eligible for garbage collection. That being said, even though the first instance of
MainActivity was destroyed, it cannot be garbage-collected and the memory of our blob cannot be reclaimed. Every change of orientation (or whenever Android decides to destroy a particular
Activity) increases memory leak.
The solution is simple: let
interval() know when it is no longer needed by unsubscribing from it. Just like
onCreate(), Android has a callback on destruction called
That is all there is to it. When an
Observable is created as part of
Activity’s lifecycle, make sure to unsusbcribe from it when the
Activity is destroyed. Calling
unsusbcribe() will detach
Observable so that it is eligible for garbage collection. Together with
Observer, the entire
MainActivity can be collected, as well. Also the
interval() itself will stop emitting events because no one is listening to them. Double win.
When you create multiple
Observables together with some
Activity, holding a reference to all
Subscriptions can become tedious. A
CompositeSubscription is a handy container in such cases. Each
Subscription can simply be inserted into
CompositeSubscription and on destruction we can unsubscribe all of them in one easy step:
It is worth mentioning that unsubscribing from an
Observable that is no longer in use is a good practice in any environment. But on resource-constrained mobile devices, this becomes particularly important. Now that you are aware of the pitfalls of memory management on Android, it is time to redesign your mobile applications. First, we will explore Retrofit, an HTTP client with built-in RxJava support that is particularly popular on mobile environments.
Retrofit is a popular library for making HTTP requests, especially in the Android ecosystem. It is neither Android-specific nor the only choice for an HTTP client. However, because it natively supports RxJava, it is a good choice for mobile applications, both written with RxJava in mind or only willing to properly handle HTTP code. The main advantage of using RxJava in network-related code is its ability to jump between threads easily.
Retrofit promotes a type-safe way of interacting with RESTful services by asking you to first declare a Java interface without implementation. This interface is later translated into an HTTP request transparently. For the purpose of the exercise, we will be interacting with Meetup API, a popular service for organizing events. One of the endpoints returns a list of cities near a given location:
Retrofit will translate the method call to
listCities() into a network call. Under the hood, we will be making an HTTP GET request to
/2/cities?lat=...&lon=... resource. Notice the return type. First, we have the strongly typed
Cities rather than
String or weakly typed map-of-maps. But more important,
Cities comes from an
Observable that will emit this object when a response arrives.
Cities class maps most of the fields found in JSON received from the server, getters, and setters omitted:
Such an approach provides a good balance between abstraction (using high-level concepts like method calls and strongly-typed responses) and low-level details (asynchronous nature of network call). Although HTTP has request-response semantics, we model inevitable latency with
Observable so that it is not hidden behind a leaky blocking RPC (remote procedure call) abstraction. Unfortunately, there is quite a bit of glue code that you must configure in order to interact with this particular API. Your mileage may vary, but it is important to see the steps required to properly parse the JSON response:
Having an instance of
Retrofit, we can finally synthesize
MeetupApi implementation to be used throughout the client code:
At last, with our
MeetupApi we can make some HTTP requests and use the power of RxJava. Let’s build a more comprehensive example. Using the Meetup API, we first grab a list of all cities and towns nearby a given location:
First, we expand an
Observable<Cities> with just one item into
Observable<City> with one item per found city using
concatMapIterable(). Then, we filter out only cities closer than 50 kilometers to the initial location. Finally, we extract a city name. Our next goal is to find the population of each city found in the vicinity of Warsaw to see how many people live within a radius of 50 kilometers. To achieve that, we must consult another API delivered by GeoNames. One method searches for location by a given name and, among other attributes, returns its population. We will again use Retrofit to connect to that API:
A JSON object must be mapped to data objects (getters and setters omitted):
The way to instantiate
GeoNames is similar to
Suddenly our sample application uses two different APIs and mashes them together very uniformly. For each city name, we would like to consult the GeoNames API and extract the population:
If you think about it for a while, the preceding program is doing quite a lot of work in this concise form. First it asks
MeetupApi for a list of cities and later for each city it fetches the population. Population responses (possibly coming asynchronously) are later totaled using
reduce(). In the end, this whole computational pipeline ends up as
Observable<Long>, emitting one long value whenever the population from all cities is accumulated.
This shows the true power of RxJava, how streams from different sources can be seamlessly combined. For example, the
populationOf() method is actually quite a complex chain of operators making an HTTP request to GeoNames and extracting population by city name:
search() method at the bottom is wrapped using default methods so that it is easier to use. After receiving a
SearchResult object wrapped in JSON, we unwrap all individual search results, make sure the population was not absent in the response, and in case of any errors we simply return 0.
Finally, we make sure each population request is invoked on an
io() scheduler to allow better concurrency.
subscribeOn() is actually crucial here. Without it, every request for population for each city would be sequential, drastically increasing the overall latency. However, because for each city
flatMap() will invoke the
populationOf() method and subscribe to it when needed, data about each city is fetched concurrently.
In fact, we can also add a
timeout() operator to each population request, as well, to achieve an even better response time at the cost of incomplete data. Without RxJava, implementing this scenario would require a lot of manual thread-pool integration. Even with
CompletableFuture the task is nontrivial. Yet RxJava with noninvasive concurrency and powerful operators make it possible to write both fast and easy to understand, concise code.
Combining two different APIs, both driven by Retrofit, works like a charm. However, there is nothing that prevents us from combining entirely unrelated
Observables; for example, one coming from Retrofit, another from a JDBC call, and yet another one receiving a JMS message. All these use cases are fairly easy to implement, neither leaking the abstraction nor giving too many details about the nature of the underlying stream implementation.
One of the very first mistakes that every Android developer makes is blocking the UI thread. On Android there is one designated main thread that interacts bi-directionally with the user interface (UI). Callbacks from native widgets invoke our handlers on main thread but also widget updates (changing labels, drawing) must occur within that thread. This restriction greatly simplifies the UI internal architecture but also has serious downsides:
- Attempting any time-consuming operation (typically blocking network call) within callback handling, a UI event will prevent other events from being handled, causing the UI to freeze. Eventually, the operating system will kill such misbehaving applications
- Updating the UI — for example, when a blocking network call completed — must occur on the main thread. We must somehow ask the operating system to invoke updating code within that main thread
Amazingly, RxJava has two built-in mechanisms for that. You can run side-effecting tasks in the background using
subscribeOn(), whereas jumping back to the main thread is easy with
observeOn(). All you need is a special
Scheduler that is aware of the Android environment and its main thread.
This small library will add the
AndroidSchedulers class to your CLASSPATH, which is essential for writing concurrent code on Android with RxJava. Using the
AndroidSchedulers is best explained by means of an example. We would like to make a call to the Meetup API, fetch a list of cities nearby a given location, and then display them:
On vanilla Android, all transformations and callbacks look as follows:
Here is what happens. When a button is clicked, we make an HTTP request via Retrofit. Retrofit produces an
Observable<Cities> that we further transform by extracting only relevant information. We end up with
List<String> representing nearby cities. This list is eventually displayed on screen.
The use of two schedulers is actually crucial. Without
subscribeOn(), Retrofit will use a caller thread to make an HTTP call, causing
Observable to become blocking. This means that the HTTP request will attempt to block the main Android thread, which is immediately picked up by an operating system and fails with
NetworkOnMainThreadException. The traditional way of running network code in the background is by either creating a new
Thread or using
AsyncTask. The advantages of
subscribeOn() are obvious: code is much cleaner, less invasive, and has built-in declarative error handling via
observeOn() invocation is equally important. When all transformations are done, we invoke a UI update only on the main thread because we want to carry out as little processing as possible there. Without
observeOn() that shifts execution to
Observable would attempt updating
listView from a background thread, which fails immediately with
observeOn() is much more convenient than
postDelayed() from the
android.os.Handler class (that
AndroidSchedulers.mainThread() uses under the hood).
Flexibility of schedulers combined with the API simplicity is very compelling to many Android developers. RxJava offers a simpler, cleaner but also safer way of tackling the complexity of concurrent programming on mobile devices.
The preceding example has one major flaw that can lead to memory leak. The
Observer keeps a reference to the enclosing Android
Activity and can outlive it. This problem was explained and dealt with in “Avoiding Memory Leaks in Activities”.
From the syntax level, RxJava aims to avoid callback hell by replacing nested callbacks with declarative transformations. Therefore,
Observable looked a bit disturbing. Fortunately, there is a library that translates Android UI events into streams. Simply add the following dependency to your project:
From this point, we can replace an imperative callback registration with a handy pipeline:
Rather than registering a callback that creates and transforms
Observable locally, we begin with
Observable<Void> representing button clicks. Clicking a button does not convey any information; thus, it is
Void. Each click event triggers an asynchronous HTTP request returning
Observable<Cities>. Everything else stays pretty much the same. If you think this is just a minor readability improvement, consider composing multiple GUI event streams.
Imagine that you have two text fields; one for entering latitude and another one for longitude. Any time either of them changes, you would like to make an HTTP request looking for all cities nearby that location. However, to avoid unnecessary network traffic when the user is still typing, we want to implement a certain delay. The network request is initiated only when no changes occurred to any text field for one second. This is very similar to autocomplete text fields that have a slight delay to avoid excessive network usage, but in this case we have to take two inputs together into account. The implementation using RxJava and RxBinding is very elegant:
nd all transformations (note how verbose the code is when lambda expressions are not an option):
RxTextView.afterTextChangeEvents() transforms the imperative callbacks invoked by
EditText whenever the content changes. We create two such streams for latitude and longitude separately. On the fly, we transform
TextViewAfterTextChangeEvent into a
double, silently dropping the malformed inputs.
Having two streams of doubles, we combine them using
combineLatest() so that we receive a stream of pairs every time either of the inputs change. The final piece is
debounce(), which waits one second before forwarding such pairs just in case another edit (either of latitude or longitude) follows shortly. Thanks to
debounce(), we avoid unnecessary network calls while the user is typing. The rest of the application remains the same.
This example nicely shows how reactive programming propagates up from Retrofit to user components so that everything in the application becomes a composition of streams. Just make sure that you unsubscribe from
afterTextChangeEvents(); failing to do so can lead to memory leak.
A typical application these days has two high-latency origins of data: network calls (mostly HTTP) and database queries. Retrofit is a fantastic source of
Observables that are backed by an asynchronous HTTP call. When it comes to database access, we spent quite some time looking at SQL databases that are historically blocking due to the JDBC API design.
NoSQL databases are more modern in that regard and often provide asynchronous, nonblocking client drivers. In this chapter, we will briefly explore Couchbase and MongoDB drivers that have native RxJava support and can return Observable for each external call.
Couchbase Server is a modern document database in the NoSQL family. What is interesting is that Couchbase supports RxJava as first-class citizen in its client API. Reactive extensions are not only used as a wrapper but are officially supported and idiomatic when interacting with the database. Many other storage engines have a nonblocking, asynchronous API but the creators of Couchbase chose RxJava as the best foundation for the client layer.
As an example, let’s query the example dataset called
travel-sample, which happens to have a document for ID
route_14197. In a sample dataset, the route document looks as follows:
Every query returns an
Observable, and from this point, we can safely transform retrieved records in whatever way we find suitable:
AsyncBucket.get() returns an
Observable<JsonDocument>. JSON documents are inherently loosely typed so in order to extract meaningful information we must traverse them with prior knowledge of their structure. Knowing what the document looks like in advance, it is easy to understand transformations on
JsonDocument. Amazingly, RxJava works equally good for the following:
- Data retrieval, including timeouts, caching, and error handling
- Data transformation, like extracting, filtering, drilling down into data, and aggregating
This shows the power of the
Observable abstraction that you can use in very different scenarios while still exposing the same concise API.
Just like Couchbase, MongoDB allows storing arbitrary JSON-like documents without any predefined schema. The client library has first-class support for RxJava allowing both asynchronous storing and querying of data. The following example does both of these. It first inserts 12 documents into the database; as soon as the batch insert is done, it queries them back:
Month class is an enum having values from January to December. Also, we can easily obtain any month’s length in both leap and nonleap years. First, we create twelve BSON (binary JSON) documents, each representing one month with its length. Then we batch insert
MongoCollection. This yields an
Observable<Success> (the value itself does not contain any meaningful information; it is a singleton). When the
Success event appears, we can query the database by calling
find().toObservable(). Hopefully, the 12 documents we just inserted are found. Excluding the automatically assigned
_id property for clarity, this is what is printed at the very end:
Again, the true power comes from composition. With MongoDB’s RxJava driver, you can easily query multiple collections at the same time and achieve concurrency without really thinking about it much. The code snippet that follows makes two concurrent requests to MongoDB and another one to some pricing service. Note that
first() is not an operator on
Observable; rather, it is a MongoDB operator that returns an
Observable after constructing a query.
find() is equivalent to the WHERE clause in SQL, whereas
first() is like
Technically, you can mix and match any
Observables, irrespective of their nature and source. The preceding example makes two queries to MongoDB to two different collections and another query in
dailyPrice() that can — for example, return an
Observable from Retrofit making an HTTP call.
The bottom line is this: the source of
Observable is irrelevant, you can compose asynchronous computations and requests any way you like. Do you plan on querying multiple databases combined with web services and local file system operation? All of these can run concurrently and be composed together with the same ease. After you grasp how RxJava behaves in general, every source of
Observable is the same on the surface.
Sometimes there is a confusion as to which abstraction to use for concurrent programming, especially since Java 8. There are a few competing APIs that allow you to express asynchronous computation in a clean way. This section compares all of them to help you choose the right tool for the job. The available abstractions include the following:
CompletableFuture introduced in Java 8 is a much more powerful extension to the well-recognized Future from the
CompletableFuture allows registering an asynchronous callback when Future completes or fails rather than blocking and waiting for the result. But the true strength comes from the composition and transformation capabilities, similar to what
flatMap() offer. Despite being introduced in standard JDK, not a single class in standard Java library depends or uses
CompletableFuture. It is perfectly usable but not very well integrated into the Java ecosystem.
CompletableFutures, streams in
java.util.stream were introduced in JDK 8. Streams are a way to declare a sequence of operations like mapping, filtering, and so on prior to execution. All operations on a stream are lazy until a terminal operation is used, like
reduce(). Also JDK can automatically parallelize some operations on all available cores, which sounds very compelling. Parallel streams promise transparent mapping, filtering, or even sorting of large datasets on multiple cores. Streams are typically generated from a collection but can just as well be created on the fly and infinite.
Observable represents a stream of events appearing in unpredictable moments in time. It can represent zero, one, fixed, or infinite number of events, available immediately or over time.
Observable can terminate with completion or error event. You should be fairly comfortable with what
Observable is by now.
When RxJava matured it became apparent that a specialized type that represents exactly one result is beneficial. The Single type is a stream that either completes with exactly one value or with an error. In that sense, it is much like
Singles are lazy, meaning that they do not begin computation until subscribed.
Sometimes we invoke a certain computation purely for side effects, not expecting any result. Sending an email or storing a record in a database are examples of such operations that involve I/O (this can benefit from asynchronous processing) but do not return any meaningful result. Traditionally,
Observable<Void> was used in such scenarios. However, the even more specific
Completable type better expresses the intent of asynchronous computation without result.
Completable can notify about completion or error in concurrent execution and just like all other Rx types, it is lazy.
Obviously, there are other ways of expressing asynchronous computation, such as the following:
Reactor. These types are somewhat similar to
However, we will keep our list of choices short by limiting it to JDK and RxJava. Before we continue, let me state that if your application already uses
CompletableFuture rather consistently, you should probably stick to it. Some APIs provided by
CompletableFuture are a bit awkward, but in general this class delivers quite good support for reactive programming. Moreover, we can expect more and more frameworks to take advantage and idiomatically support it. Supporting RxJava in third-party libraries is more difficult because it requires additional dependency, whereas
CompletableFuture is part of JDK.
Let’s shift for a moment and discuss parallel streams from the standard JDK. In Java 8, when you transform a moderately big collection of objects you can transform them declaratively with optional parallelism:
parallelStream() rather than conventional
stream() in the preceding code snippet. By using
parallelStream(), we ask for terminal operation like
collect() to be performed in parallel rather than sequentially. Of course, this should not have any impact on the result but is supposed to be much faster. Under the hood, what
parallelStream() does is split an input collection into multiple chunks, invoke operations on each one of them in parallel, and then combine the results in a divide-and-conquer spirit.
Many operators are very straightforward to parallelize — for example,
filter() — others are a bit more difficult (like
sorted()) because after sorting every chunk separately we must combine them together, which in the case of sorting means merging two sorted sequences. Some operations are inherently difficult or impossible to parallelize without further assumptions. For example,
reduce() can be performed only if the accumulating function is associative.
Ideally, taking Amdahl’s law into account on a four-CPU machine, we can expect up to four times faster execution. But parallel streams have their drawbacks. To begin with, for small streams and short pipelines of transformations the cost of context switching can be significant to the point at which parallel streams are slower than their sequential counterparts. The problem of too fine-grained a concurrency can potentially occur in RxJava as well, therefore it supports declarative concurrency via
Schedulers. The situation with parallel streams is different.
Ever wondered why this framework is called parallel and not concurrent streams? Parallel streams were only designed for CPU-intensive work and have a hardcoded thread pool (
ForkJoinPool, to be precise) that is aligned with the number of CPUs we have. This pool is available statically and globally under
Every parallel stream, as well as some
CompletableFuture callbacks within JVM share this
ForkJoinPool. All parallel streams in the entire JVM (so in multiple applications if you are deploying WAR files onto application server) share the same small pool. This is generally fine because parallel streams were designed for parallel tasks, which really need the CPU 100% of the time. Thus, if multiple parallel streams are invoked concurrently they do compete for CPU, no matter what.
But imagine one selfish application running an I/O operation within a parallel stream:
publishOverJms() sends a JMS message for each person in a stream. We intentionally chose JMS sending. It seems fast, but due to delivery guarantees a JMS send will most likely touch either network (to notify message broker) or disk (to persist message locally). This tiny amount of I/O latency is enough to hold precious
ForkJoinPool.commonPool() threads for an excessively long time. Even though this program is not using CPU, no other code within JVM is allowed to execute parallel stream. Now imagine if this were not sending over JMS but retrieving data from web service or making an expensive database query.
parallelStream() can only ever be used for entirely CPU-bound tasks, otherwise the performance of the JVM takes a significant hit.
This does not imply that parallel streams are bad. However, due to the fixed thread pool powering them they are of very limited use. Certainly, parallel streams from JDK are not a replacement for
Observable.flatMap() or other concurrency mechanisms. Parallel streams work best when executed, well… in parallel. But concurrent tasks that do not require the CPU 100% of the time — for example, being blocked on network or disk — are better off using other mechanisms.
Knowing the limitations of streams lets us compare futures and RxJava to see where they fit best.
The closest equivalent to
CompletableFuture in RxJava is
Single. You can also use
Observable, keeping in mind that it can emit any number of values. One big difference between futures and RxJava types is the laziness of the latter. When you have a reference to
CompletableFuture, you can be sure that background computation already began, whereas
Observable will most likely begin to work only when you subscribe to them. Knowing this semantic discrepancy, you can fairly easily interchange
For rare cases in which the result of asynchronous computation is unavailable or irrelevant,
Observable<Void> was used. Whereas the former is quite straightforward, the latter might suggest a potentially infinite stream of empty events, whatever that means. Using
rx.Single<Void> sounds as bad as a future of
rx.Completable was introduced. Use
Completable when your architecture has many operations that have no meaningful result (but might result in an exception). One example of such architecture is command-query separation (CQS) wherein commands are asynchronous and by definition have no result.
When your application deals with a stream of events appearing over time (e.g., user logins, GUI events, and push notifications),
Observable is unbeatable. We never mentioned it, but since version 1.0, Java has offered
java.util.Observable, which allows registering
Observers and notifying them at the same time. Yet it lacks the following:
- Composition capabilities (no operators)
- Generics (
update()method taking the Object representing an arbitrary notification payload)
- Performance (
synchronizedkeyword used everywhere,
- Separation of concerns (in some sense, it combines
PublishSubjectunder the same interface)
- Concurrency support (all observers are notified sequentially)
Observable from JDK is the best of what we can get in standard Java for declarative modeling of events, right after
addListener() methods in the GUI packages. If your domain explicitly mentions events or flow of data,
rx.Observable<T> is hard to beat. The declarative expressiveness combined with a broad family of operators can solve many of the problems you can come across. For cold
Observables, you can take advantage of backpressure to control the throughput, whereas in case of hot
Observables, you can use many flow control operators like
RxJava is all about streams of events being processed in memory and on the fly. It provides a consistent, rich API abstracting away the details of the event source. Ideally, we should keep only a very limited, fixed set of events in memory, between the producer emitting events and the consumer storing them or forwarding to another component.
In reality, some components, especially when misused, can consume an unlimited amount of memory. Obviously, memory is limited and we will eventually encounter either
OutOfMemoryError or a never-ending garbage collection loop. This sections shows you a few examples of uncontrolled consumption and memory leaks in RxJava and how to prevent them.
There are operators that can consume any amount of memory depending only on the nature of your stream. We will look at just few of them and try to take some safety measures to avoid leaks.
distinct(), by definition, must store all encountered keys since the subscription. The default overload of
distinct() compares all seen events so far with an internal cache set. If the same event (with respect to
equals()) did not appear yet in the stream, it is emitted and added to the cache for the future. This cache is never evicted to guarantee that the same event never again appears. You can easily imagine that if events are fairly big or frequent, this internal cache will just keep growing, leading to memory leak.
For the purpose of this demonstration, we will use the following event simulating a big chunk of data:
The following program is executed against a very memory constraint environment (
-mx32M: 32 MB of heap), emitting fairly large events as fast as it can:
After running this,
OutOfMemoryError appears very quickly because the internal cache of
distinct() can no longer hold more
Picture instances. The CPU usage shortly before crash is also quite severe due to the garbage collector being determined to free some space. But even if rather than using the entire
Picture as a key used to distinguish events we use only
Picture.tag the program still crashes, only much later:
This type of leak is even more dangerous. The problem slowly escalates without us noticing, until it finally explodes in the least expected moment, often under high load. To prove that
distinct() is the root of memory leak, check out a similar program that does not use
distinct() but instead counts how many events were emitted per second without any buffering. Your mileage may vary, but you can expect hundreds of thousands of large messages per second processed without much pressure on garbage collection or memory:
So how do you avoid memory leaks related to
distinct()altogether. As simple as that, this operator is inherently dangerous when used incorrectly
- Choose your key wisely. Ideally it should have finite and small value space. Enum and byte are OK, long or String probably not. If you cannot prove that a given type will only ever have very limited number of values (like
enum) you are risking memory leak
distinctUntilChanged()instead, which keeps track of only the last seen event, not all of them
- Do you really need uniqueness from the very beginning or can you maybe relax this requirement? Maybe you somehow know that duplicates can ever appear only within 10 seconds of one another? Then consider running
distinct()on a small window:
Every 10 seconds we start a new window and ensure that there are no duplicates within that window. The
window() operator emits an
Observable of all events that occurred within each time window. Unique (with respect to
distinct()) values in that window are immediately emitted. When the 10-second window is over, a new window starts, but more importantly, the cache associated with the old window is garbage-collected.
Of course, within these 10 seconds we can still have a critical number of events causing
OutOfMemoryError, so it is better to use a window of fixed length (e.g.,
window(1000)) rather than fixed time. Also, if nondistinct events appeared unfortunately right at the end of one window and at the beginning of the next window, we will not discover a duplicate. This is a trade-off of which you must be aware.
The fact that
toList() can consume an unlimited amount of memory is quite obvious. Moreover, using
toList() for infinite streams makes no sense.
toList() emits just one item on completion of upstream source — when the completion is not expected,
toList() will never emit anything. But it will continue to aggregate all events in memory. Using
toList() for very long streams is also questionable. You should find a way of consuming the events on the fly or at least limiting the number of upstream events using operators like
toList() makes sense when you need to look at all events of finite
Observable at the same time. This is rarely the case, you can apply predicates (like
anyMatch()), count items (
count()), or reduce them to single aggregate value (
reduce()) without ever needing all events in memory at the same time. One use case could be transforming an
Observable<List<T>> where the inner
Observable has known fixed length:
This is equivalent to the following:
Which brings us to
buffer(). Before using
buffer(), think deeply if you really need to have a
List<T> of all events within a time frame. Maybe an
Observable<T> is enough, for example, suppose that you need to know whether there were more than five incidents of high priority in each second having an
Observable<Incident>. You want to produce an
Observable<Boolean> that every second either emits true if a large number of high priority incidents occurred within that second, or false otherwise. With
buffer(), this is quite straightforward:
window() does not require buffering events into intermediate
List but forwards them on the fly.
window() is equally convenient for the same task but keeps constant memory usage.
Observable actually has much richer API compared to
Stream from the JDK so you might find yourself converting a Java
Observable just for the sake of better operators. For example, streams do not have support for a sliding
That being said, you should prefer
buffer() when possible, especially when the size of internal
List accumulated in
buffer() is impossible to predict and manage.
cache() operator is another obvious memory consumer. Even worse than
cache() keeps a reference to every single event that it ever received from upstream. It makes sense to use
Observables that are known to have fixed, short length. For example, when
Observable is used to model an asynchronous response of some component, using
cache() is safe and desirable. Otherwise, each
Observer will trigger the request again, potentially leading to unanticipated side effects. Conversely, caching long, possibly infinite
Observables, especially hot ones, makes very little sense. In the case of hot
Observables, you are probably not interested in historic events anyway.
The same story goes for
ReplaySubject. Everything you place in such a Subject must be stored so that subsequent Observers get all notifications, not only the future ones. The suggestions for both
ReplaySubject are pretty much the same. If you find yourself using them, it is up to you to guarantee that the source you are caching is finite and relatively short. Also if possible try not to keep a reference to a cached
Observable for too long, so that it can be garbage-collected after a while.
If you try to zip two sources, one of which is even slightly slower than the other,
zipWith() operators must temporarily buffer the faster stream while waiting for corresponding events from the slower one:
You might expect this code to eventually crash with OutOfMemoryError because
zip() supposedly keeps its ever-growing buffer of events from fast, waiting for the slow stream. But this is not the case; in fact, we almost immediately get the dreadful MissingBackpressureException. The
zipWith()) operator does not blindly receive events at whatever throughput the upstream dictates. Instead, these operators take advantage of backpressure and only request as little data as possible. Therefore, if upstream
Observables are cold and implemented properly,
zip() will simply slow down the faster
Observable by requesting less data than it could technically produce.
In the case of
interval(), though, the mechanism does not work this way. The
interval() operator is cold because it starts the counter only when someone subscribes and each
Observer gets its own independent stream. Yet, after we already subscribed to
interval(), there is no way of slowing it down, by definition it must emit events at a certain frequency. Therefore, it must ignore backpressure requests and possibly lead to
MissingBackpressureException. All we can do is drop the excess events:
But in case of
MissingBackpressureException, how is it better than
OutOfMemoryError? Well, missing backpressure fails fast, whereas out of memory can build up slowly, consuming precious memory that could have been allocated elsewhere. But missing backpressure can also occur in the least expected moment—for example, when garbage collection happens.
It is much easier to begin with RxJava when some source of
Observables appears in our codebase. Implementing a new
Observable from scratch is error-prone, so when various libraries have native RxJava support, it is much easier to begin. In “From
Observables” we slowly refactored existing application from imperative, collection-oriented style to stream-oriented, declarative approach. But after you introduce libraries that are sources of asynchronous
Observables, the refactoring becomes much easier. The more streams you have in your application, the more reactive API propagates up. It begins at the data-acquisition level (database, web service and so on) and bubbles to service and web layer. Suddenly our entire stack is written reactively. At some point, when the usage of RxJava reaches a certain critical point in the project, there is no longer a need for
toBlocking(), because everything is a stream, top to bottom.