Thomas Nield

RX_Java_Part_10.png

A Quick Introduction

If you follow news on Android or Java development, chances are you have heard the phrase "reactive programming" more than once. Proving to be more than a trend, it is becoming harder to ignore thanks to Netflix's maturing implementation of RxJava. To raise its pertinence even more, Android jobs are increasingly listing RxJava as a skill requirement.

But this should be more exciting than stressful, as reactive programming alleviates many of the pain points of software development. Do you struggle with concurrency? Events and exception handling? What about minimizing boilerplate and making code composable and evolvable? RxJava paints a broad stroke against many problems that plague software development, and learning it is a great investment that will add value in ways you might have thought impractical before. RxJava levels the playing field between events and data by treating them exactly the same way.

There are a shortage of tutorials to teach RxJava in a pragmatic way. I hope to change that with this short article to help you get started. This will not make you an expert overnight (and I will recommend some resources), but it will definitely give you a jump start.

Setting Up

RxJava is a small lightweight library, making it appropriate for usage with Android apps. You can download it manually from The Central Repository or add it as a dependency in Gradle as shown below.


compile 'io.reactivex:rxjava:1.1.8'

There is a rich ecosystem of RxJava extensions through other various libraries. RxJava-JDBC is an excellent library that simplifies database SQL querying with RxJava.RxAndroid bridges Android components to RxJava, and RxKotlin streamlines RxJava for the Kotlin language. There is a vast number of libraries that adapt RxJava for different problems, but for now let's just focus on the core fundamentals of RxJava, starting with the Observable.

The Observable and Subscriber

The core type in RxJava is the Observable<T>. In its simplest definition, an Observable pushes things. It pushes items of type T through a series of operations, until each T item arrives at a Subscriber<T> where it is finally consumed. Most of the time an Observable<T> has an finite number of T items to push, but sometimes it may push an infinite number of items over time (like a JavaFX Button pushing an ActionEvent object every time it is clicked).

Here is a simple example of an Observable<Integer>. It pushes a finite series of Integerobjects.


Observable<Integer> numbers = Observable.just(1,2,3,4,5);

This is a source observable because this is where emissions originate from. An emission is a single pushed item of type T. We declared a source Observable<Integer> that will push the numbers 1 through 5, although nothing has actually been pushed yet. We need to create a Subscriber to request the Integer emissions and consume them. To quickly create a Subscriber, we can call subscribe() and pass a lambda dictating what to do with each received Integer. In this example we will just print them.


Observable<Integer> numbers = Observable.just(1,2,3,4,5); numbers.subscribe(i -> System.out.println(i));

Wrap the above code in a main() method, run the program, and you will get the following output

Output:


1 2 3 4 5

A lambda is a special type of argument specifying an instruction. In this case, the subscribe() takes a lambda argument and I provided the lambda i -> System.out.println(i). I arbitrarily named each incoming integer i. The arrow -> points to an instruction on what to do with each i item, and here I am passing it to System.out.println(i) to be printed.

Note that Android Nougat will be the first Android release to support Java 8 and lambda expressions. For earlier Android releases, you can leverage lambdas through the RetroLambda library. You can also leverage Kotlin as a more functional language for Android development, and it works phenomenally with RxJava.

Note there are many static factories to create source Observables. Since these Integer emissions are consecutive, I could use Observable.range() instead to accomplish the same thing. This will produce a source Observable that pushes the Integers 1 through 5.


Observable<Integer> numbers = Observable.range(1,5); numbers.subscribe(i -> System.out.println(i));

Output:


1 2 3 4 5

Another common factory is Observable.from() which will turn any collection into a finite Observable.


List<String> list = Arrays.asList("Alpha","Beta","Gamma","Delta","Epsilon"); Observable<String> items = Observable.from(list); items.subscribe(s -> System.out.println(s));

Output:


Alpha Beta Gamma Delta Epsilon

Observable Operators

Let's introduce operators to make this more interesting. Build another Observable<Integer> off numbers by calling its map() operator, and pass a lambda that multiplies each pushed integer by 100. Save that new multiplied Observable to a variable and subscribe() to it.


Observable<Integer> numbers = Observable.just(1,2,3,4,5); Observable<Integer> multiplied = numbers.map(i -> i * 100); multiplied.subscribe(i -> System.out.println(i));

Output:


100 200 300 400 500

We inserted an operation between the source Observable and the Subscriber that transforms each incoming Integer with map(). You can also do this in one line of code without any variables.


Observable.just(1,2,3,4,5).map(i -> i * 100).subscribe(i -> System.out.println(i));

 We eliminated saving any Observables to variables, and of course you can break this "chain" up in separate lines to aid readability.


Observable.just(1,2,3,4,5) .map(i -> i * 100) .subscribe(i -> System.out.println(i));

You can use map() and other operators to transform to new types. For example, you can start with String items but then map() them to Integer items through their lengths.


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .map(s -> s.length()) .subscribe(l -> System.out.println(l));

Output:


5 4 5 5 7

RxJava has hundreds of operators and continues to get more. The distinct() can be used to push emissions but holds back any duplicates that have already been emitted.

Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
        .map(s -> s.length())
        .distinct()
        .subscribe(l -> System.out.println(l));

Output:


5 4 7

You can also supply a lambda to the distinct() operator to distinct on something specific about the item, like its length().


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .distinct(s -> s.length()) .subscribe(l -> System.out.println(l));

Output:


Alpha Beta Epsilon

Of course, there are also operators like filter() which suppress emissions that fail to meet a specified lambda condition.


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .filter(s -> s.length() <= 5) .subscribe(l -> System.out.println(l));

Output:


Alpha Beta
Gamma Delta

Some operators will collect all emissions in some form and consolidate them into a single emission. For example, count() will count all emissions and then push that count to the Subscriber once its done.


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .count() .subscribe(l -> System.out.println(l));

Output:


5

The toList() will do something similar. It will collect all emissions and when it receives all of them, it will push an entire List<T> holding all the emissions to the Subscriber.


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .toList() .subscribe(l -> System.out.println(l));

Output:


[Alpha, Beta, Gamma, Delta, Epsilon]

Logically, count() and toList() will not work with infinite Observables that emit an infinite number of items over time. This is something to always be aware of and we will revisit this later.

 

Finally, there is one last critical operator we need to cover: the flatMap(). It is like map()except you use it to take an emission, and replace it with another set of emissions from another Observable. This is arguably one of the most powerful operators in RxJava, and to this day I still find new ways to leverage it. But here is the simplest example. Let's say you have three Strings holding concatenated sets of numbers.


Observable<String> concatenatedSets = Observable.just("1/5/8", "1/9/11/58/16", "9/15/56/49/21");

Say you wanted to break these numbers up into individual emissions, and remove the slashes. This can be done with the flatMap(). First split() each String emission on the forward slash /, then call Observable.from() to turn that Array<String> into an Observable<String>. Each individual numeric String will be consolidated into a single Observable<String>, and you can convert each one into an Integer.

Observable<String> concatenatedSets =
        Observable.just("1/5/8", "1/9/11/58/16/", "9/15/56/49/21");

concatenatedSets.flatMap(s -> Observable.from(s.split("/")))
        .map(s -> Integer.valueOf(s))
        .subscribe(i -> System.out.println(i));

Output:


1 5 8 1 9 11 58 16 9 15 56 49 21

The flatMap() has many interesting usages especially when it is composed with hot Observables, which we will cover later. But for now just know it replaces an emission with another set of emissions, by mapping the emission to another Observable.

Push-Based vs. Pull-Based

Experienced developers reading this article may be asking "How is an Observable any different than a Java 8 Stream? Or a Kotlin Sequence?" Why do I need it especially when Java 8 is coming on Android Nougat?

With everything we covered to this point, yes the Observable does not seem much different than either of these utilities. But it is important to note that the Observablepushes. Java 8 Streams and Kotlin Sequences pull. Next we will investigate what pushing allows: a notion of emissions over time.

Click Here for  Part 2

thomas_nield_photo.jpgThomas Nield

Thomas Nield is a software developer and business analyst at Southwest Airlines working in Revenue Management. He is the author of the O'Reilly book Getting Started with SQL and contributes to RxJava, Kotlin, and JavaFX libraries. 

@thomasnield9727
https://www.linkedin.com/

Thomas Nield's Blog

Related Search Term(s): RxJava, Tutorials

Create, Design, Develop and Connect at AnDevCon D.C. 2017!

Thoughts? Leave a comment: