Thomas Nield

RX_Java_Part_20.png

Observables and Timing

While the examples in part one do not reflect this, Observables actually do have a notion of "emissions over time". For example, Observable.interval() will emit consecutive long values at a specified time interval.


Observable<Long> counter = Observable.interval(1, TimeUnit.SECONDS); counter.map(l -> l * 1000).subscribe(l -> System.out.println(l)); //sleep to prevent exiting before Observable has chance to fire try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }

 Output:


0 1000 2000 3000 4000 5000 6000 7000 8000 9000

Note too that this Observable actually will fire on a separate thread (not the main thread like the earlier examples), because a timer requires a separate thread. We also had to sleep() the main thread so the program does not exit prematurely before the emissions had a chance to fire. You can learn more about RxJava concurrency on my blog. For now let's not digress and save concurrency for another time.

Another way Observables have a notion of timing is how they behave when they are subscribed to. All the examples we saw earlier are cold Observables, meaning they will "replay" the emissions to each Subscriber. They will repeat the emissions each time they are subscribed to.


Observable<String> items = Observable.just("Alpha","Beta","Gamma","Delta","Epsilon"); items.subscribe(s -> System.out.println(s)); items.map(s -> s.length()).subscribe(i -> System.out.println(i));

 Output:*


Alpha Beta Gamma Delta Epsilon 5 4 5 5 7

Cold observables are ideal for pushing data as every Subscriber will get all the data emissions. But unlike cold Observables, hot Observables will start firing "live" emissions and will not replay missed items to new Subscribers. This is appropriate for events like Button clicks, network requests, and other entities representing "events" rather than "data".

Hot Observables can be used for different purposes and be created in many ways. Sometimes you may want to convert a cold Observable into a hot one, perhaps because the emissions are expensive to replay. You can turn a cold Observable into a hot one is by making it a ConnectableObservable. Call any Observable's publish() method, set up its Subscribers, and then call connect() to fire the emissions hotly all at once.


ConnectableObservable<String> items = Observable.just("Alpha","Beta","Gamma","Delta","Epsilon").publish(); items.subscribe(s -> System.out.println(s)); items.map(s -> s.length()).subscribe(i -> System.out.println(i)); items.connect();

Output:


Alpha 5 Beta 4 Gamma 5 Delta 5 Epsilon 7

Notice too how the emissions interleave, indicating they are not being replayed to each Subscriber separately but rather firing each emission to both Subscribers at once.

Here's an experiment to prove hot Observables don't replay emissions to tardy Subscribers. Build a ConnectableObservable off an Observable.interval(). Then connect() it immediately with one Subscriber subscribed, but wait 5 seconds and add another Subscriber later. What do you think will happen?


public static void main(String[] args) { ConnectableObservable<Long> timer = Observable.interval(1,TimeUnit.SECONDS).publish(); timer.subscribe(i -> System.out.println(i)); timer.connect(); sleep(5000); timer.map(i -> i * 1000).subscribe(i -> System.out.println(i)); sleep(5000); } private static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } }

 Output:


0 1 2 3 4 5 5000 6 6000 7 7000 8 8000 9 9000

Notice how the second Subscriber never received a 1000, 2000, 3000, or 4000. It started at 5000 because it missed emissions 0 through 4. This is because the Observable was made hot and it does not replay missed emissions to new Subscribers.

The best definition I have heard of a Cold Observable versus a Hot Observable is by Nickolay Tsvetinov, author of Learning Reactive Programming with Java 8:

We can say that cold Observables generate notifications for each subscriber and hot Observables are always running, broadcasting notifications to all of their subscribers. Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song. A cold Observable is a music CD. Many people can buy it and listen to it independently.

As you use RxJava, you will find yourself mixing cold and hot Observables together to compose events and data in a single stream. For example, you can take a hot Observable, then flatMap() it to a cold Observable to push a List<Integer> every second.


Observable<String> items = Observable.just("Alpha","Beta","Gamma","Delta","Epsilon"); ConnectableObservable<Long> timer = Observable.interval(1,TimeUnit.SECONDS).publish(); timer.flatMap(i -> items.map(s -> s.length()).toList()) .subscribe(l -> System.out.println(l)); timer.connect(); sleep(5000);

Output:


[5, 4, 5, 5, 7] [5, 4, 5, 5, 7] [5, 4, 5, 5, 7] [5, 4, 5, 5, 7] [5, 4, 5, 5, 7]

Just be mindful when you cross the two, as emitting data from a hot Observable can easily result in data being missed by Subscribers. And definitely do not use operators like toList() and count() on an infinite Observable. They will work infinitely and never emit anything. We will find out technically why this is next.

How Observables Work

An Observable is a class that has many implementations. Calling the map() operator on any Observable will return an Observable implementation built for that purpose. The filter(), flatMap(), and the hundreds of other operators return different implementations of Observable as well. But thankfully these implementations are abstracted away from you. What is pertinent is the Observable calls three methods on an Observer to communicate and pass items "up the chain".

Observer Methods

Method Parameter Description
onNext() T item Pushes an item to the Observer
onCompleted() None Notifies Observer that it is done
onError() Throwable e Notifies Observer of an error

The Subscriber implements the Observer interface, and operator Observables like the ones returned by map() or filter() are both an Observable and Observer. You do not have public access to these Observer methods, but internally they are used to receive and push notifications between operator Observables.


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") //calls onNext() on map() .map(s -> s.length()) //calls onNext() on filter() .filter(i -> i <= 5) // calls onNext() on Subscriber .subscribe(l -> System.out.println(l));

This makes sense because when we pass items through a series of operators, items are passed from the source Observable, then to each operator, and then finally to theSubscriber.

The onNext() is what passes emissions "up the chain". The onCompleted() pushes a notification that there are no more emissions, and onError() pushes an error that occurred somewhere in the chain. Notice how all three of these methods "push" something all the way to the Subscriber. Again, "pushing" is a common theme in reactive programming.

You can actually create a Subscriber that handles all three of these events, by providing three lambda arguments to handle the onNext(), onError(), and onCompleted() events.


Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .map(s -> s.length()) .filter(i -> i <= 5) .subscribe(l -> System.out.println(l), //onNext() e -> e.printStackTrace(), //onError() () -> System.out.println("Done!") //onCompleted() );

 Output:


5 4 5 5 Done!

You might think the onNext() is the most critical operator as that is what physically passes items. But the onCompleted() is highly critical especially for finite operators like count() and toList(). This notification tells those operators to not expect any more emissions and to push forward the count or List. If you have an infinite Observable that never calls onCompleted(), using toList() or count() will cause those operators to work infinitely and never emit anything. The onCompleted() will also notify the operators to clean up any resources as they are no longer needed.

A good way to explore how onNext(), onError(), and onCompleted() behave in an Observable chain is to use the doOnNext(), doOnError(), and do OnCompleted() operators anywhere in the Observable chain. These will allow us to "peek" at each emission and do something with them, without affecting the rest of the chain. This is also helpful for debugging. For example, if we are confused why nothing is getting emitted to a Subscriber, we can use doOnNext() before each operator to see which operator is to blame. In this case, since we never see Pushing x to Subscriber printed, we can conclude quickly the filter() operator is to blame thanks to its condition i <= 3, which fails all emissions.

Output:


Pushing Alpha to map() Pushing 5 to filter() Pushing Beta to map() Pushing 4 to filter() Pushing Gamma to map() Pushing 5 to filter() Pushing Delta to map() Pushing 5 to filter() Pushing Epsilon to map() Pushing 7 to filter()

 Summary

Hopefully this article has given you a fundamental understanding of RxJava, and I hope its usefulness has already piqued your interest. If you want to learn more RxJava I would recommend Learning Reactive Programming with Java 8 by Nickolay Tsvetinov as well asIntro to RxJava by Chris Froussios. Stay posted for future articles and be sure to check out my blog that covers many practical RxJava topics.

thomas_nield_photo.jpgThomas Nield

Thomas Nield is a software developer and business analyst at Southwest Airlines working in Revenue Management. He is the author of the O'Reilly book Getting Started with SQL and contributes to RxJava, Kotlin, and JavaFX libraries. 

@thomasnield9727
https://www.linkedin.com/

Thomas Nield's Blog

Related Search Term(s): RxJava, Tutorials

Create, Design, Develop and Connect at AnDevCon D.C. 2017!

Thoughts? Leave a comment: