Understanding MVVM on Android Tutorial 06 – Refactoring our code with RxJava

It’s time to go reactive! For our tutorial series, we are building a simple app which accepts a username, accesses the Github REST API and displays the user’s repos. So what events should we be observing and responding to? Or, thinking in “Reactive Programming” terms , what are our “Observables”?


Some examples of possible events/Observables are:
1. When or if we get data back from our Rest API (Model Layer)
2. Text being written in the EditText so we can do some real-time validation. (View Layer)
3. When the user hits the “Submit” button (View Layer)

We want to keep this example simple, but there are so many other events/observables we could be monitoring, such as:
4. When results are first displayed to the user
5. If the user clicks any items in the returned data
6. When the user scrolls to the end of the list and so on and so forth…

For now we will only concern ourselves with the first three events. The first step we’ll take is adding the reactive libraries to our app’s build.gradle

    /*
    * Core Library RxJava.
    */    
    compile 'io.reactivex.rxjava2:rxjava:2.0.8'
    /*
    * Rx Android adds android specific bindings for RxJava. Specifically 
    * AndroidSchedulers.mainThread() which allows you to switch between 
    * the background thread and the main thread on your observable's completion.
    */
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    /*
    * To convert Retrofit "Call" objects to return type "Observable",
    * we have to set the call adapter to RxJavaCallAdapter. 
    * See the link here for more information
    * https://github.com/square/retrofit/tree/master/retrofit-adapters/rxjava2 
    */
    compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'

Now we’ll refactor the code, starting with our networking call.

[EVENT 1] : When/If we get data back from our Rest API (Model Layer)
Previously, we defined our interface for accessing the GitHub API like so.

package com.kyubid.sample.mvvmtutorial.networking;

import com.kyubid.sample.mvvmtutorial.model.Repo;
import java.util.List;
import retrofit2.Call;
import retrofit2.http.GET;
import retrofit2.http.Path;

public interface GithubService {
    String ENDPOINT = "https://api.github.com";

    @GET("/users/{user}/repos")
    Call<List<Repo>> reposForUser (@Path("user") String user);
}

Refactoring our code for Rx, simply involves changing the “Call” object to an “Observable”.

package com.kyubid.sample.mvvmtutorial.networking;

import com.kyubid.sample.mvvmtutorial.model.Repo;
import java.util.List;
import retrofit2.http.GET;
import retrofit2.http.Path;
import io.reactivex.Observable;

public interface GithubService {
    String ENDPOINT = "https://api.github.com";

    @GET("/users/{user}/repos")
    Observable<List<Repo>>  reposForUser(@Path("user") String user);
}

In the MainActivity.java , the fetchGitHub(…) method changes from this:

   
private void fetchGitHub(String user) {

   Gson gson = new GsonBuilder()
      .create();

   Retrofit retrofit = new Retrofit.Builder()
      .baseUrl(GithubService.ENDPOINT)
      .addConverterFactory(GsonConverterFactory.create(gson))
      .build();

   GithubService githubClient = retrofit.create(GithubService.class);

   // Execute the call asynchronously.
   call.enqueue(new Callback<List<Repo>>() {
        
   @Override
   public void onResponse(Call<List<Repo>> call, Response<List<Repo>> response) {
      if (response.isSuccessful()) {
         data = response.body();
         adapter = new RecyclerAdapter(data);
         recyclerView.setAdapter(adapter);
     } else {
         //handle error
     }
            }

   @Override
   public void onFailure(Call<List<Repo>> call, Throwable t) {
      // the network call was a failure
      Log.d(TAG, "Error Occurred");
            }
        });

    }
}

to this:

   
private void fetchGitHub(String user) {

   Retrofit retrofit = new Retrofit.Builder()
      .baseUrl(GithubService.ENDPOINT)
      //Addition of this line changes Retrofit Call object to Observable
      .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
      .addConverterFactory(GsonConverterFactory.create())
      .build();

  GithubService githubClient = retrofit.create(GithubService.class);
        
  //1. Note how we're using the exact object type returned from Retrofit here
  Observable<List<Repo>> reposReturnedObservable = githubClient.reposForUser(user);

  reposReturnedObservable
     .subscribeOn(Schedulers.newThread())
     .observeOn(AndroidSchedulers.mainThread())
     //2. ...and here
     .subscribe(new Observer<List<Repo>>() {
        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }

        @Override
        public void onSubscribe(Disposable d) {
           compositeDisposable.add(d);
        }

        @Override
        //3. ...and here
        public void onNext(List<Repo> repos) {
           data = repos;
           adapter = new RecyclerAdapter(data);
           recyclerView.setAdapter(adapter);
        }
    });
}

Let’s break down what we have done step by step.

Observable<List> reposReturnedObservable = githubClient.reposForUser(user);

This is where we create our Observable and assign it to the Observable object being returned from our Retrofit Call.
Note: we create our Observable with the same object type returned from Retrofit.

We then specify our schedulers or perform some “thread management” for our particular needs

.subscribeOn(Schedulers.newThread())

Schedulers are the method RxJava uses to introduce multi-threading into our apps. In our app we use the Schedulers.newThread() method, this starts a new thread for the work done by the Observable. Note that whatever thread the Observable is created on is responsible for executing the calls from the Observable. So if we had not specified Schedulers.newThread(), our Observable call would have been carried out our app’s main thread causing our app to freeze. RxJava offers six kinds of Schedulers, described in the documentation as:

Schedulers.io() — create and return a scheduler for I/O operations
Schedulers.computation() — create and return a scheduler for any computations not related to I/O
Schedulers.immediate() — this scheduler immediately starts work on the current thread
Schedulers.newThread() — this scheduler starts a new thread for the specified work
Schedulers.trampoline() — create and returns a scheduler that queues work on the current thread to be executed after the current work completes.
Schedulers.from() — converts a java.util.concurrent.Executor into a new Scheduler instance. *Use this if you want to create your own custom Scheduler.

The two Schedulers, you are likely to come across often are Schedulers.io() and Schedulers.computation(). The description pulled from the documentation above are quite vague but essentially the difference between the two comes down to how CPU-intensive you expect your operation to be.
Computational tasks are tasks that require continuous use of the CPU like complex calculations or other intensive processing. So the number of threads you have available to perform computational tasks is limited to the number of “cores”, or processors built into the CPU. For example, if you have a quad-core, you can optimally support 5 computational threads (4 cores + 1 extra thread for idle time). If you have 8 cores, you can optimally support 9 threads, and so on. If you exceed this simple rough formula (e.g. running 6 threads or more on a 4-core machine) you risk compromising performance as one of your cores will be forced to switch between threads.
However I/O operations (file system access, database/network calls) don’t really need the CPU the whole time as they just sleep until the I/O operation is done. So it would be perfectly fine to have 1000 I/O operations on a single-core machine, since they’re asleep the majority of the time. I will include a link at the end of this post a really helpful blog post that helps further explain the difference between these two.

.observeOn(AndroidSchedulers.mainThread())

We used Schedulers.newThread() to carry out our Observable call on a new thread but if we want to update our UI with the returned data from our Observable we need to switch back to the Main/UI thread. Luckily for us, the RxAndroid package we added to the project provides a AndroidSchedulers.mainThread() method which allows us to observe the data from the Observable on the main UI thread. Essentially, we have executed the code on a new thread but we need to be back on the UI thread when the results come in.

Finally, we subscribe to the observable which will cause the Observable to start emitting events. We subscribe with an Observer object which possesses the onError, onComplete, onSubscribe and onNext methods. Note that any data from the Observable is returned in the onNext (…) method.

    .subscribe(new Observer<List<Repo>>() {
                    @Override
                    public void onError(Throwable e) {
                        // Called when the observable encounters an error
                    }

                    @Override
                    public void onComplete() {
                        // Called when the observable has no more data to emit
                    }

                    @Override
                    public void onSubscribe(Disposable d) {
                        // Cancel the connection with the Observable.
                        compositeDisposable.add(d);
                    }

                    @Override
                    public void onNext(List<Repo> repos) {
                        // Called each time the observable emits data
                        // i.e. data from your Observable is received here
                        data = repos;
                        adapter = new RecyclerAdapter(data);
                        recyclerView.setAdapter(adapter);
                    }
                });

 

[EVENT 2] : Text being written in the EditText (View Layer)
To handle events from our View/UI layer, we’ll be making use of a library called RxBinding which was built by Square to handle interacting with UI components in a reactive way. We’ll include the library with the following line in the app’s build.gradle.

compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'

For every UI/Design library you are using in your app, include the corresponding RxBinding library. In our app, we are using the following libraries:

compile 'com.android.support:appcompat-v7:25.1.0'
compile 'com.android.support:recyclerview-v7:25.1.0'

So we will need to include the following RxBinding libraries to our project:

compile 'com.jakewharton.rxbinding2:rxbinding-appcompat-v7:2.0.0'
compile 'com.jakewharton.rxbinding2:rxbinding-recyclerview-v7:2.0.0'

To enable error messages on our EditText, we will be wrapping it in a special widget called the TextInputLayout. This widget is contained in Android’s Design package, so we will also include the following two lines in our build.gradle

compile 'com.android.support:design:25.3.1'
compile 'com.jakewharton.rxbinding2:rxbinding-design:2.0.0'

Note: After syncing your project, you may have some errors relating to conflicts with dependency ‘com.google.code.findbugs:jsr305’. You can fix this by including the following in your app’s build.gradle. You can more about this particular issue here.

android {
    ....

    configurations.all {
        resolutionStrategy.force 'com.google.code.findbugs:jsr305:1.3.9'
    }
}

To monitor text changes in the EditText, we will first create the Observable:

Observable<CharSequence> editTextObservable = RxTextView.textChanges(editText); 

Then we perform some operations on our Observable and subscribe to it.

editTextObservable
   .skip(1) //1
   .debounce(500, TimeUnit.MILLISECONDS) //2
   .observeOn(AndroidSchedulers.mainThread())  //3
   .subscribe(new Observer<CharSequence>() { //4
      @Override
      public void onError(Throwable e) {
      }

      @Override
      public void onComplete() {
      }

      @Override
      public void onSubscribe(Disposable d) {
         compositeDisposable.add(d);
      }

      @Override
      public void onNext(CharSequence charSequence) {
         //disable the button until after validity check
         button.setEnabled(false);
         textInputLayout.setErrorEnabled(false);

         String strUsername = charSequence.toString().trim();

         //Regex for Github username rules
         Pattern pattern = Pattern.compile("^[a-zA-Z0-9]+(?:[A-Za-z0-9-])*+([a-zA-Z0-9])*\\b$");
         Matcher matcher = pattern.matcher(strUsername);

         if (strUsername.matches("")) {
            //empty string
            textInputLayout.setError(getResources().getString(R.string.err_msg_emptybox));
            textInputLayout.setErrorEnabled(true);
         }else if (!matcher.matches()){
            //string contains spaces, underscores or non-alphanumeric characters
            textInputLayout.setError(getResources().getString(R.string.err_msg_username));
            textInputLayout.setErrorEnabled(true);
         }else {
            //string is valid
            button.setEnabled(true);
         }

     }
  });

What did we do?
1. According to the documentation, “RxTextView.textChanges() emits the current text value of the TextView before emitting any changes”. This means that the Observable will fire an event on app load and before our user has made any changes. We are only interested in changes the user makes , so we will skip the first event using the skip() method.
2. We don’t want to react to an event every time the user types a character. We would rather give the user some time to write and then react to what they have entered. The debounce operator we included will only notify us of text changes after every 500 milliseconds.
3. We want to receive our results back on the main Thread because we will be changing a UI component: TextInputLayout (…if there is an error message to display).
4. We subscribe to the Observable and do our text validation with the data received in the “OnNext” method.

 

[EVENT 3] : When the user hits the “Submit” button (View Layer)
Previously, we defined the events from our button like this:

private void initViews(){
   Button button = (Button) findViewById(R.id.button);

   button.setOnClickListener(new View.OnClickListener() {
      @Override
      public void onClick(View view) {
         recyclerView.removeAllViewsInLayout();
         String strGithubUser = editText.getText().toString().trim();
         fetchGitHub(strGithubUser);
         }
      });
}

To factor the code reactively, the code changes to:

private void initViews(){
   Button button = (Button) findViewById(R.id.button);

   Observable<Object> buttonObservable = RxView.clicks(button);

   buttonObservable
      .subscribe(new Observer<Object>() {
         @Override
         public void onSubscribe(Disposable d) {
            compositeDisposable.add(d);
         }

         @Override
         public void onNext(Object o) {
            recyclerView.removeAllViewsInLayout();
            String strGithubUser = editText.getText().toString().trim();
            fetchGitHub_2(strGithubUser);
         }
                    
         @Override
         public void onError(Throwable e) {
         }
                    
         @Override
         public void onComplete() {
         }
   });
}

The diagram below shows a helpful way to remember the structure of Observables.

Avoid Memory Leaks and Other Problems
Using RxJava introduces a high potential for memory leaks in your application. One commonly occurring scenario is where the Android system tries to destroy an Activity that contains a running Observable. Since the observable is still running, its observer will be holding a reference to the activity, and the system will be unable to garbage collect this activity. Or the Android system may successfully destroy the activity causing a memory leak when the Observer returns. So you must remember to shut down all your Observers/close all your disposables when your activity shuts down.

In our app, I have used a container object available in the RxJava library called CompositeDisposable which can hold a reference to all the disposables in our activity. If you have another look at the code above, you will see I added the disposable for each Observer to the CompositeDisposable in the onSubscribe(Disposable d) method. Finally, I cleared all the disposables when the activity shuts down as shown below:

public class ReaderActivity extends AppCompatActivity {    
private RecyclerView recyclerView;    
private List<Repo> data;    
private EditText editText;    
private Button button;    
private RecyclerAdapter adapter;    
private TextInputLayout textInputLayout;    
private CompositeDisposable compositeDisposable;    

@Override    
protected void onStop() {  
    compositeDisposable.clear();      
    super.onStop();            
    }
}

To avoid any confusion, in RxJava1.x disposables were referred to as “subcriptions”.
Another option, to handle memory management with RxJava is to use the The RxLifecycle library, developed by Trello. This provides lifecycle handling APIs that you can use to limit the lifespan of an Observable to the lifecycle of an Activity or Fragment.

Thats it! All of our 3 events are now being handled reactively. In the next post, I will write some final notes on RxJava and then we’ll move on to creating the ViewModel.

Upcoming posts in the series:
Understanding MVVM on Android Tutorial 07 – More Tips and Tricks with RxJava
Understanding MVVM on Android Tutorial 08 – Creating the ViewModel

Further Reading:
http://tomstechnicalblog.blogspot.co.uk/2016/02/rxjava-understanding-observeon-and.html
http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases
https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples/blob/e8fbc22a21687f5d55edd93d10de163d6f6221e9/README.md

Leave a Reply