Aitor Viana

Nov 15, 2016 1:38:52 PM

Writen by Aitor Viana

Easy RX.png

You can’t model your software systems in a complete synchronous fashion…that is in the past. However, you probably continue developing mainly using imperative programming, and having asynchronous sources forces you to handle all that complexity yourself.

It is immediately clear that you cannot treat your asynchronous sources in a synchronous fashion. In Android, that would mean blocking the UI thread every time you need to perform a network call. Another thing you might think of doing is just performing the asynchronous tasks assumming they will always succeed…not cool!

Imagine the following simple scenario. You need to first authenticate a user and only then get its privileges from your server.

    interface LoginManager {
        void authenticate(String username);
        void getPrivileges(String username);
    }

You have probably encountered this situation or a very similar one, and you have probably solved it somehow similar to the code bellow:

    final LoginManager loginManager new LoginManager();
    String name = "Cool name";
    
    loginManager.login(name, new LoginManager.Listener() {
        @Override
        public void success() {
            loginManager.getPrivileges(name, new LoginManager.Listener() {
                @Override
                public void success() {
                    // TODO: all systems go
                }
    
                @Override
                public void error() {
                    // TODO: ups...
                }
            });
        }
    
        @Override
        public void error() {
            // TODO: show error
        }
    });

And you tell me “this is easy, this structure looks familiar to me, I’ve done it before no problem”. And you are right, it is easy but, it is not simple.

Let’s refocus on Android. You have this call somewhere inside the context of an Activity, and before you know it, you need to start handling the lifecycle of the Activity (because you know, rotation), and making sure you are a good Android citizen not leaving asynchronous tasks without any listener and possibly leaking memory.

Add to that the fact that your asynchronous methods run on a background thread, and when they all succeed you need to do something on the UI thread. Maybe update a TextView. You need to wrap whatever code is inside that most inner success() call with runOnUiThread() to hop back to the UI thread and update your views.

…Uhm! but the task was simple, wasn’t it? You just wanted your code to (1) hit the server to authenticate the user; (2) then retrieve the user privileges, and (3) log any possible error along the way - because you know, sh*t happens.

Instead you ended up with this monster code of nested calls with various error exit points and having to handle a bunch of state. And down there, buried in all this complexity of nested listeners, callbacks, and state, lies that tiny bit of code to authenticate the user and get its privileges…you are now a very sad developer…and then reactive happened!

Reactive Programming

Wikipedia says:

In computing, reactive programming is a programming paradigm oriented around data flows and the propagation of change.

In English please, and with an example:

If your UI were to update when something in the database changes, with reactive you can kill the middle man (used to be yourself) and now hook these two things together so that the UI reacts whenever something changes in the database.

You can leverage off reactive programming using Reactive Extensions (Rx). For you Java and Android developer, RxJava is what you are looking for. But they are also available in many other languages.

RxJava: The Basics

Rx focuses on data. More specifically in data streams that you can observe and react to.

In Rx you can easily set which thread certain tasks will be performed on, combine different data streams, transform, modify, filter them…you name it.

Main concepts in Rx

Observables. Emit any number of items (even 0) and terminate due to an error or because they run out of items to emit. They might even emit infinitely (if no error occurs). For instance, an Observable that emits button click events.

Operators. Operate on the items allowing transformations, filtering, merging, etc.

Schedulers. Allow you to specify in which thread the things occur on.

Subscribers. Observe the data stream and react to it.

OK, show me the code!

Let’s build a hello-world using RxJava and some of the concepts explained above. This can be overwelming, I know, just bear with me. It will all make sense…eventually.

Observable that emits String items every second and forever.

    Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long number) {
                return "String number " + number;
            }
        })
        .subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                // the data stream completed
            }
        
            @Override
            public void onError(Throwable e) {
                // an error occurred
            }
        
            @Override
            public void onNext(String s) {
                // next string has been emitted
            }
        });

Let’s break that down.

The first part is the Observable creation. It actually emits numbers of type Long.

    Observable.interval(1, TimeUnit.SECONDS)

The next part of the code is the map() operator. That takes in the Long emitted item, and transforms it into a String item that is return instead.

    Observable.interval(...)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long number) {
                return "String number " + number;
            }
        })

The last part is the Subscriber. That listens to any new String item being emitted and reacts to it.

    Observable.interval(...)
        .map(...)
        .subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                // the data stream completed
            }
        
            @Override
            public void onError(Throwable e) {
                // an error occurred
            }
        
            @Override
            public void onNext(String s) {
                // next string has been emitted
            }
        });

Note that the subscriber does not even know the original source of items is in reality a number of type Long.

For now just ignore the type of events the Subscriber can react to. You will see why later on.

How to create obsevables

You’ve seen above how to create a simple Observable that emits some items every second and forever, using the .interval() factory method. There are many other ways to create observables. Here's a few of them.

Never use Observable.create()

but wait! it is called create()…right? O_o

Yes, the name kind of invites you to use it to create observables. The fact is, you need to craft the Observable rather than creating it, and handle everything by yourself.
Just stay away from this method for now, until you master Rx. There will be cases (very very few) where you might need it. Certainly not now.

Observable.just()

Emits a single item and then completes.

    Observable.just("Hello I am Muzzy");
    // Emits: "Hello I am Muzzy" and then completes

Observable.from()

Converts a sequence of items into an observable and emits them in sequence.

    ArrayList<String> muzzy = new ArrayList<>(Arrays.asList("Hello", "I", "am", "Muzzy"));

    Observable.from(muzzy);
    // Emits: "Hello" ---> "I" ---> "am" ---> "Muzzy"

Observable.fromCallable()

Converts a callable (e.g. method) into an Observable that invokes the callable when the observer subscribes to it, and then emits the value it returns.

    String muzzy() { return "Hello I am Muzzy"; }
    
    Observable.fromCallable(new Func0<String>() {
        @Override
        public String call() {
            return muzzy();
        }
    });
    // Emits: "Hello I am Muzzy" and then completes

Subscriber(s)

To subscribe to an observable you will use the method Observable.subscribe(), that returns a Subscriber. You’ve seen it in the example before.

Observable.subscribe() takes in from zero to three input parameters. When provided, those are callbacks to onNext(), onError() and onCompleted() respectively.

  • onNext() callback will be called for every item emission and takes in a parameter of the same type of the item being emitted.
  • onError() callback will be called upon error on the observable emission chain and takes in one parameter of type Throwable. This is a terminal event and will stop the emission of any subsequent item.
  • onCompleted() callback is called when the observable is done emitting items and takes no parameter. This is also a terminal event, this time because there is no more items in the stream.

Taking the examples above that emit String items, you don’t need to care about onCompleted() nor onError(), so instead you can use a simpler class to define your onNext() action.

    Action1<String> onNextAction = new Action1<String>() {
        @Override
        public void call(String s) {
            log(s);
        }
    };

And then subscribe to the observable.

    Observable.from(muzzy).subscribe(onNextAction)

If you are otherwise insterested in all possible actions, you can subscribe to all of them.

    Observable.from(muzzy).subscribe(onNextAction, onErrorAction, onCompletedAction)

For most cases I always recommend to at least register the onError() callback.

Operators

Operators sit between the source Observable and the Subscriber to manipulate and transform the emitted items.

Operator map()

You have seen it in action before, and it is a very simple operator. It takes in a source Observable, performs some transformation to it, and emits the result, which can be of the same or different type.

If you remember our initial example, it was taking in an item of type Long and transforming it into a String that was emitted instead.

There are many more operators available, and you can chain as many of them as you want between the Observable source and the Subscriber.

Schedulers

What about threads? You might ask, "Where is my work running?"

To answer that we’ve got these two operators that tell observables what should be done where.

  • subscribeOn(). It defines in which thread the Observable will start its execution on upon subscription. It should only be used once. If called more than once, the usage closest to the Observable source takes precedence. If not, the specified Observable defaults to the thread you subscribe on until it is changed.
  • observeOn(). You can use this as many times as needed. It affects everything downstream.

In English and with an example, please.

    Observable.from(Arrays.asList("Hello", "I", "am", "Muzzy"))
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.d(TAG, "doOnNext() called with: s = [" + s + "] on thread [" + Thread.currentThread() + "]");
                }
            }).observeOn(Schedulers.io())
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.d(TAG, "onNext() called with: s = [" + s + "]");
                }
            });

You haven’t seen the operator doOnNext(), but it is simple. When onNext() fires it also does what specified on doOnNext.

Let’s see the output:

D/MainActivity: doOnNext() called with: strings = [Hello] on thread [Thread[main,5,main]]
D/MainActivity: doOnNext() called with: strings = [I] on thread [Thread[main,5,main]]
D/MainActivity: doOnNext() called with: strings = [am] on thread [Thread[main,5,main]]
D/MainActivity: doOnNext() called with: strings = [Muzzy] on thread [Thread[main,5,main]]

D/MainActivity: onNext() called with: strings = [Hello] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: onNext() called with: strings = [I] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: onNext() called with: strings = [am] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: onNext() called with: strings = [Muzzy] on thread [Thread[RxIoScheduler-2,5,main]]

Awesome! because we didn’t call subscribeOn, doOnNext() was executed and the thread (the main thread) subscribe() was called on. Later, observeOn changed over the io thread, and onNext was called on that thread.

Let’s subscribeOn a different thread and run it again.

    Observable.from(Arrays.asList("Hello", "I", "am", "Muzzy"))
        .subscribeOn(Schedulers.io())
        .doOnNext(...)
        .observeOn(Schedulers.io())
        .subscribe(...);

And the output.

D/MainActivity: doOnNext() called with: strings = [Hello] on thread [Thread[RxIoScheduler-3,5,main]]
D/MainActivity: doOnNext() called with: strings = [I] on thread [Thread[RxIoScheduler-3,5,main]]
D/MainActivity: doOnNext() called with: strings = [am] on thread [Thread[RxIoScheduler-3,5,main]]
D/MainActivity: onNext() called with: strings = [Hello] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: onNext() called with: strings = [I] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: onNext() called with: strings = [am] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: doOnNext() called with: strings = [Muzzy] on thread [Thread[RxIoScheduler-3,5,main]]
D/MainActivity: onNext() called with: strings = [Muzzy] on thread [Thread[RxIoScheduler-2,5,main]]

See what I did there? Now doOnNext() is also executed in the io thread.

One last twist. Change the scheduler in observeOn()

    Observable.from(Arrays.asList("Hello", "I", "am", "Muzzy"))
        .subscribeOn(Schedulers.io())
        .doOnNext(...)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(...);

And, once more, the output.

D/MainActivity: doOnNext() called with: strings = [Hello] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: doOnNext() called with: strings = [I] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: doOnNext() called with: strings = [am] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: doOnNext() called with: strings = [Muzzy] on thread [Thread[RxIoScheduler-2,5,main]]
D/MainActivity: onNext() called with: strings = [Hello] on thread [Thread[main,5,main]]
D/MainActivity: onNext() called with: strings = [I] on thread [Thread[main,5,main]]
D/MainActivity: onNext() called with: strings = [am] on thread [Thread[main,5,main]]
D/MainActivity: onNext() called with: strings = [Muzzy] on thread [Thread[main,5,main]]

Self explanatory, right?…right?

Now what?

You might be thinking “this is a lot of complexity and boilerplate code for something so simple” and you’re right. This was a very simple example. But think for a second how powerful all this is when applied to more complex situations.

Your Observable source could be anything, a database, a network call, a UI button that emits clicks. You can even combine several Observables into one data stream that you can subscribe on.

For instance, you could combine your database Observable and your network call Observable into one. Filter out the repeated elements. Store the new ones not yet present in the database and update the UI ListView…all in a handful lines of code.

So go ahead, start playing with RxJava, but remember, with great power comes great responsibility.

Related Search Term(s): RxJava

Create, Design, Develop and Connect at AnDevCon D.C. 2017!

Thoughts? Leave a comment: