A Quick Introduction
If you follow news on Android or Java development, chances are you have heard the phrase "reactive programming" more than once. Proving to be more than a trend, it is becoming harder to ignore thanks to Netflix's maturing implementation of RxJava. To raise its pertinence even more, Android jobs are increasingly listing RxJava as a skill requirement.
But this should be more exciting than stressful, as reactive programming alleviates many of the pain points of software development. Do you struggle with concurrency? Events and exception handling? What about minimizing boilerplate and making code composable and evolvable? RxJava paints a broad stroke against many problems that plague software development, and learning it is a great investment that will add value in ways you might have thought impractical before. RxJava levels the playing field between events and data by treating them exactly the same way.
There are a shortage of tutorials to teach RxJava in a pragmatic way. I hope to change that with this short article to help you get started. This will not make you an expert overnight (and I will recommend some resources), but it will definitely give you a jump start.
RxJava is a small lightweight library, making it appropriate for usage with Android apps. You can download it manually from The Central Repository or add it as a dependency in Gradle as shown below.
There is a rich ecosystem of RxJava extensions through other various libraries. RxJava-JDBC is an excellent library that simplifies database SQL querying with RxJava.RxAndroid bridges Android components to RxJava, and RxKotlin streamlines RxJava for the Kotlin language. There is a vast number of libraries that adapt RxJava for different problems, but for now let's just focus on the core fundamentals of RxJava, starting with the
The core type in RxJava is the
Observable<T>. In its simplest definition, an
Observable pushes things. It pushes items of type
T through a series of operations, until each
T item arrives at a
Subscriber<T> where it is finally consumed. Most of the time an
Observable<T> has an finite number of
T items to push, but sometimes it may push an infinite number of items over time (like a JavaFX
Button pushing an
ActionEvent object every time it is clicked).
Here is a simple example of an
Observable<Integer>. It pushes a finite series of
Observable<Integer> numbers = Observable.just(1,2,3,4,5);
This is a source observable because this is where emissions originate from. An emission is a single pushed item of type
T. We declared a source
Observable<Integer> that will push the numbers 1 through 5, although nothing has actually been pushed yet. We need to create a
Subscriber to request the
Integer emissions and consume them. To quickly create a
Subscriber, we can call
subscribe() and pass a lambda dictating what to do with each received
Integer. In this example we will just print them.
Observable<Integer> numbers = Observable.just(1,2,3,4,5); numbers.subscribe(i -> System.out.println(i));
Wrap the above code in a
main() method, run the program, and you will get the following output
1 2 3 4 5
A lambda is a special type of argument specifying an instruction. In this case, the
subscribe() takes a lambda argument and I provided the lambda
i -> System.out.println(i). I arbitrarily named each incoming integer
i. The arrow
-> points to an instruction on what to do with each
i item, and here I am passing it to
System.out.println(i) to be printed.
Note that Android Nougat will be the first Android release to support Java 8 and lambda expressions. For earlier Android releases, you can leverage lambdas through the RetroLambda library. You can also leverage Kotlin as a more functional language for Android development, and it works phenomenally with RxJava.
Note there are many static factories to create source Observables. Since these
Integer emissions are consecutive, I could use
Observable.range() instead to accomplish the same thing. This will produce a source
Observable that pushes the Integers 1 through 5.
Observable<Integer> numbers = Observable.range(1,5); numbers.subscribe(i -> System.out.println(i));
1 2 3 4 5
Another common factory is
Observable.from() which will turn any collection into a finite
List<String> list = Arrays.asList("Alpha","Beta","Gamma","Delta","Epsilon"); Observable<String> items = Observable.from(list); items.subscribe(s -> System.out.println(s));
Alpha Beta Gamma Delta Epsilon
Let's introduce operators to make this more interesting. Build another
numbers by calling its
map() operator, and pass a lambda that multiplies each pushed integer by 100. Save that new
Observable to a variable and
subscribe() to it.
Observable<Integer> numbers = Observable.just(1,2,3,4,5); Observable<Integer> multiplied = numbers.map(i -> i * 100); multiplied.subscribe(i -> System.out.println(i));
100 200 300 400 500
We inserted an operation between the source
Observable and the
Subscriber that transforms each incoming
map(). You can also do this in one line of code without any variables.
Observable.just(1,2,3,4,5).map(i -> i * 100).subscribe(i -> System.out.println(i));
We eliminated saving any Observables to variables, and of course you can break this "chain" up in separate lines to aid readability.
Observable.just(1,2,3,4,5) .map(i -> i * 100) .subscribe(i -> System.out.println(i));
You can use
map() and other operators to transform to new types. For example, you can start with
String items but then
map() them to
Integer items through their lengths.
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .map(s -> s.length()) .subscribe(l -> System.out.println(l));
5 4 5 5 7
RxJava has hundreds of operators and continues to get more. The
distinct() can be used to push emissions but holds back any duplicates that have already been emitted.
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .map(s -> s.length()) .distinct() .subscribe(l -> System.out.println(l));
5 4 7
You can also supply a lambda to the
distinct() operator to distinct on something specific about the item, like its
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .distinct(s -> s.length()) .subscribe(l -> System.out.println(l));
Alpha Beta Epsilon
Of course, there are also operators like
filter() which suppress emissions that fail to meet a specified lambda condition.
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .filter(s -> s.length() <= 5) .subscribe(l -> System.out.println(l));
Some operators will collect all emissions in some form and consolidate them into a single emission. For example,
count() will count all emissions and then push that count to the
Subscriber once its done.
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .count() .subscribe(l -> System.out.println(l));
toList() will do something similar. It will collect all emissions and when it receives all of them, it will push an entire
List<T> holding all the emissions to the
Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .toList() .subscribe(l -> System.out.println(l));
[Alpha, Beta, Gamma, Delta, Epsilon]
toList() will not work with infinite Observables that emit an infinite number of items over time. This is something to always be aware of and we will revisit this later.
Finally, there is one last critical operator we need to cover: the
flatMap(). It is like
map()except you use it to take an emission, and replace it with another set of emissions from another
Observable. This is arguably one of the most powerful operators in RxJava, and to this day I still find new ways to leverage it. But here is the simplest example. Let's say you have three Strings holding concatenated sets of numbers.
Observable<String> concatenatedSets = Observable.just("1/5/8", "1/9/11/58/16", "9/15/56/49/21");
Say you wanted to break these numbers up into individual emissions, and remove the slashes. This can be done with the
String emission on the forward slash
/, then call
Observable.from() to turn that
Array<String> into an
Observable<String>. Each individual numeric
String will be consolidated into a single
Observable<String>, and you can convert each one into an
Observable<String> concatenatedSets = Observable.just("1/5/8", "1/9/11/58/16/", "9/15/56/49/21"); concatenatedSets.flatMap(s -> Observable.from(s.split("/"))) .map(s -> Integer.valueOf(s)) .subscribe(i -> System.out.println(i));
1 5 8 1 9 11 58 16 9 15 56 49 21
flatMap() has many interesting usages especially when it is composed with hot Observables, which we will cover later. But for now just know it replaces an emission with another set of emissions, by mapping the emission to another
Push-Based vs. Pull-Based
Experienced developers reading this article may be asking "How is an
Observable any different than a Java 8
Stream? Or a Kotlin
Sequence?" Why do I need it especially when Java 8 is coming on Android Nougat?
With everything we covered to this point, yes the
Observable does not seem much different than either of these utilities. But it is important to note that the
Observablepushes. Java 8 Streams and Kotlin Sequences pull. Next we will investigate what pushing allows: a notion of emissions over time.