android – Convert RxJava Observables To Live Data With Kotlin Extension Functions-ThrowExceptions

Exception or error:

I’ve been using alot of RxJava Observables converted to LiveData in my code using LiveDataReactiveStreams.fromPublisher() library. So I though of adding an extension function to the RxJava Observable to easily convert them to LiveData.

These are my extension functions:

fun <T> Flowable<T>.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this)
}

fun <T> Observable<T>.toLiveData(backPressureStrategy: BackpressureStrategy) :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable(backPressureStrategy))
}

fun <T> Single<T>.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
}

fun <T> Maybe<T>.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
}

fun <T> Completable.toLiveData() :  LiveData<T> {
    return LiveDataReactiveStreams.fromPublisher(this.toFlowable())
}

My questions are:

  1. Is this a good idea?
  2. Is there a better way of doing this?
  3. Could these extension functions be better?

P.S.

I’m new to Kotlin and so I am asking these question. Anything helpful would be appreciated. Thank you very much.

How to solve:

I think this is a pretty good idea. An example benefit of LiveData is the ability of using it directly in your data-binding layouts. Let’s say in your view-model you have:

val user: LiveData<User>

data class User(val firstName: String, val lastName: String)

in your layout you can bind the properties of the User directly:

android:text="${viewModel.user.firstName}"

You can’t use reactive streams in data-binding like this. If user was Flowable<User>, referencing ${viewModel.user.firstName} wouldn’t work.

Furthermore, data-binding will handle the lifecycle for you (observing changes only in active state and updating the view when changes happen) if your activity or fragment calls ViewDataBinding.setLifecycleOwner(LifecycleOwner):

binding.setLifecycleOwner(this)

The one for converting Completable to LiveData<T> doesn’t really make sense to me, because it will never notify the observer about anything, so I’d just get rid of it.

There are some considerations when converting from reactive streams to live data (like I had when I wanted to resume a countdown after rotation), but I don’t think they are related to the extension functions you presented, those seem to do their job. The issue to keep in mind here is that when the lifecycle owner moves from active to inactive state, PublisherLiveData cancels the subscription to the stream, and when the state changes to active, it will create a new subscription, which means restarting the stream in many cases (I guess that is if the stream is “cold”), while you probably want to resume the stream from where it was after rotation or other configuration changes. If the stream was “hot” on the other hand, emissions get ignored during the inactive state. I think this problem has to be addressed even if you used reactive streams directly and handled life cycle manually. But the thing is that simply converting reactive streams to LiveData isn’t enough to solve this problem.

It’s good to document those methods as not handling the error state, which has to be handled upstream. Alternatively, that could be one of the improvements for these functions – transforming the stream first to handle errors (as a lambda parameter with a default for example). Another possibility would be to utilize Result (experimental at the moment), or something similar to encapsulate the success or error.


As an afterthought, regarding this part that I wrote above:

There are some considerations when converting from reactive streams to live data, but I don’t think they are related to the extension functions you presented.

I still think it holds true in general, however I’m not sure if you actually want to use Single.toLiveData() and Maybe.toLiveData() in practice most of the time. Since Maybe and Single are modeling one-time operations, then it may be preferable not to cancel it when there are no active observers and have to re-start it once there are new active observers. Instead, posting to some MutableLiveData and disposing the Single/Maybe in onCleared might be useful (I’m not sure that can be encapsulated in an extension function). They still may have some use that I simply don’t see at the moment.

By the way, your Flowable.toLiveData() is already in the androidx.lifecycle:lifecycle-reactivestreams-ktx artifact.

This leaves the Observable.toLiveData(), which I think should be just as useful as Flowable one.

###

Your solution is fine if you want to use LiveData and Rx together.

If you just want to auto-dispose your subscription you could implement it on Disposable like this:

private class LifecycleDisposable(obj: Disposable) :
        DefaultLifecycleObserver, Disposable by obj {
    override fun onStop(owner: LifecycleOwner) {
        if (!isDisposed) {
            dispose()
        }
    }
}

fun Disposable.attachToLifecycle(owner: LifecycleOwner) {
    owner.lifecycle.addObserver(LifecycleDisposable(this))
}

and call it like

Observable.just(1, 2, 3).subscribe().attachToLifecycle(this)

where this references any LifecycleOwner.

###

Theese functions can be a little shorter, but that’s not a good idea at all. Rx gives much more possibilities, than LiveData listeners

Leave a Reply

Your email address will not be published. Required fields are marked *