Skip to main content

RxJava introduction for Android developer

RxJava is Java implementation of Reactive Extension. Basically it’s a library that composes asynchronous events by following Observer Pattern.
You can create asynchronous data stream on any thread, transform the data and consumed it by an Observer on any thread. The library offers operators like map, combine, merge, filter and lot more that can be applied onto data stream. 

Below are the list of schedulers available and their brief introduction.
- Schedulers.io() – This is used to perform non CPU-intensive operations like making network calls, reading disc / files, database operations etc., This maintains pool of threads. 


- AndroidSchedulers.mainThread() – This provides access to android Main Thread / UI Thread. Usually operations like updating UI, user interactions happens on this thread. We shouldn’t perform any intensive operations on this thread as it makes the app glitchy or ANR dialog can be thrown. 


- Schedulers.newThread() – Using this, a new thread will be created each time a task is scheduled. It’s usually suggested not to use schedular unless there is a very long running operation. The threads created via newThread() won’t be reused. 


- Schedulers.computation() – This schedular can be used to perform CPU-intensive operations like processing huge data, bitmap processing etc., The number of threads created using this scheduler completely depends on number CPU cores available. 


- Schedulers.single() – This scheduler will execute all the tasks in sequential order they are added. This can be used when there is necessity of sequential execution is required. 


- Schedulers.immediate() – This scheduler executes the the task immediately in synchronous way by blocking the main thread. 


- Schedulers.trampoline() – It executes the tasks in First In – First Out manner. All the scheduled tasks will be executed one by one by limiting the number of background threads to one. 


- Schedulers.from() – This allows us to create a scheduler from an executor by limiting number of threads to be created. When thread pool is occupied, tasks will be queued. 


RxJava is all about two key components: Observable and Observer.
In addition to these, there are other things like SchedulersOperators and Subscription.
Observable: Observable is a data stream that do some work and emits data.
Observer: Observer is the counter part of Observable. It receives the data emitted by Observable.
Subscription: The bonding between Observable and Observer is called as Subscription. There can be multiple Observers subscribed to a single Observable.

Operator / Transformation: Operators modifies the data emitted by Observable before an observer receives them.
Schedulers: Schedulers decides the thread on which Observable should emit the data and on which Observer should receives the data i.e background thread, main thread etc., 
Disposable: Disposable is used to dispose the subscription when an Observer no longer wants to listen to Observable. In android disposable are very useful in avoiding memory leaks. 




Composite disposable with custom object

we have the below Observables in RxJava2.
* Observable 


* Single 


* Maybe 


* Flowable 


* Completable 


Opposing to each Observable, we have the following Observers.
* Observer 


* SingleObservable 


* MaybeObservable 


* CompletableObserver 


Single always emits only one value or throws an error. The same job can be done using Observable too with a single emission but Single always makes sure there is an emission. A use case of Single would be making a network call to get response as the response will be fetched at once. 
Completable observable won’t emit any data instead it notifies the status of the task either success or failure. This observable can be used when you want to perform some task and not expect any value. A use case would be updating some data on the server by making PUT request.
Flowable observable should be used when an Observable is generating huge amount of events/data than the Observer can handle. As per doc, Flowable can be used when the source is generating 10k+ events and subscriber can’t consume it all. 

Map, FlatMap, ConcatMap and SwitchMap applies a function or modifies the data emitted by an Observable.
- Map modifies each item emitted by a source Observable and emits the modified item. 


- FlatMap, SwitchMap and ConcatMap also applies a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again. 


- FlatMap and ConcatMap work is pretty much same. They merges items emitted by multiple Observables and returns a single Observable. 


- The difference between FlatMap and ConcatMap is, the order in which the items are emitted. 


- FlatMap can interleave items while emitting i.e the emitted items order is not maintained. 


- ConcatMap preserves the order of items. But the main disadvantage of ConcatMap is, it has to wait for each Observable to complete its work thus asynchronous is not maintained. 


- SwitchMap is a bit different from FlatMap and ConcatMap. SwitchMap unsubscribe from previous source Observable whenever new item started emitting, thus always emitting the items from current Observable. 





Buffer gathers items emitted by an Observable into batches and emit the batch instead of emitting one item at a time.

Debounce operators emits items only when a specified timespan is passed. This operator is very useful when the Observable is rapidly emitting items but you are only interested in receiving them in timely manner.

Concat operator combines output of two or more Observables into a single Observable. Concat operator always maintains the sequential execution without interleaving the emissions. So the first Observables completes its emission before the second starts and so forth if there are more observables.

Merge also merges multiple Observables into a single Observable but it won’t maintain the sequential execution.


For sample code refer https://github.com/sachinsandbhor/MVP-Architecture-using-kotlin

Popular posts from this blog

How to check internet connectivity in Android app using BroadcastReceiver

So let's get started with How to check internet connectivity in Android using BroadcastReceiver. For this post I'm using Kotlin language. Well lots of people are using simple method to get network connectivity using ConnectivityManager. But using that approach we will get network state only when we are calling that method. So how would I know that network state changed. This will achieved using BroadcastReceiver.  What is BroadcastReceiver? BroadcastReceiver uses Publish-Subscribe pattern. Android app can send and receive messages from Android system and other apps.  Android system sends broadcast when various system events occurred like device charging, network state changes. Apps can register to receive specific broadcasts. When a broadcast is sent, the system automatically routes broadcasts to apps that have subscribed to receive that particular type of broadcast. So for this scenario we need to add some changes in AndroidManifest.xml file - ...

Get domain name from URL in android

Sample Code: String url="http://sports.in.msn.com/football-world-cup-2014/world-cup-animals-5"; String hostName=getDomainName(url); private CharSequence getDomainName(String shareURL) throws URISyntaxException {    URI uri = new URI(shareURL);    String domain = uri.getHost();     return domain; } above method return host name from given URL. You can get more from URL using below methods. uri.getProtocol(); uri.getAuthority(); uri.getHost(); uri.getPort(); uri.getPath(); uri.getQuery(); uri.getFile(); Output: protocol = http authority = sports.in.msn.com host = sports.in.msn.com port = -1 path = /football-world-cup-2014/world-cup-animals-5 query = null filename = /football-world-cup-2014/world-cup-animals-5