android – Rx Observable emitting values periodically-ThrowExceptions

Exception or error:

I have to poll some RESTful endpoint periodically to refresh my android app’s data. I also have to pause and resume it based on connectivity (if the phone is offline, there’s no need to even try). My current solution is working, but it uses standard Java’s ScheduledExecutorService to perform periodic tasks, but I’d like to stay in Rx paradigm.

Here’s my current code, parts of which are skipped for brevity.

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservable is basically a broadcast receiver wrapped into Observable<Boolean>, indicating that the phone is connected to the network.

As I said, this solution is working, but I want to use Rx approach for periodic polling and emitting new UserProfiles, because there are numerous problems with scheduling things manually, which I want to avoid. I know about Observable.timer and Observable.interval, but can’t figure out how to apply them to this task (and I’m not sure if I need to use those at all).

How to solve:

There are a few approaches on this GitHub issue that you might find helpful.

https://github.com/ReactiveX/RxJava/issues/448

The three implementations are:


Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
        .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
            public Observable<Notification<AppState>> call(Long seconds) {
                return lyftApi.updateAppState(params).materialize(); } });

Scheduler.schedulePeriodically

Observable.create({ observer ->
        Schedulers.newThread().schedulePeriodically({
            observer.onNext("application-state-from-network");
        }, 0, 1000, TimeUnit.MILLISECONDS);
    }).take(10).subscribe({ v -> println(v) });

Manual Recursion

Observable.create(new OnSubscribeFunc<String>() {
        @Override
        public Subscription onSubscribe(final Observer<? super String> o) {
            return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
                @Override
                public Subscription call(Scheduler inner, Long t2) {
                    o.onNext("data-from-polling");
                    return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<String>() {
        @Override
        public void call(String v) {
            System.out.println("output: " + v);
        }
    });

And the conclusion is that manual recursion is the way to go because it waits until the operation is completed before scheduling the next execution.

###

One of options is to use Observable.interval and checking the user state when the intervals are emitted:

     Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);

    //pulling the user data
    Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
        Random random = new Random();
        @Override
        public Observable<String> call(Long tick) {
            //here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
            switch(random.nextInt(10)){
                case 0://suppose this is for cases when network in  not available or exception happens
                    return Observable.<String>just(null);
                case 1:
                case 2:
                    return Observable.just("Alice");
                default:
                    return Observable.just("Bob");
            }
        }
    });

    Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
        @Override
        public Observable<? extends String> call(Observable<String> stringObservable) {
            return stringObservable;
        }
    });

    //filter valid data
    Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s != null;
        }
    });

    //publish only changes
    Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();

You can do it even simpler if your networkStatusObservable is emitting events at least as frequently as you need to check user data

 networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)

Finally you can can create observable which uses scheduler to emit the user states periodically – refer to Schedulers documentation to learn which scheduler fit you needs the best:

public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
    private final Scheduler scheduler;
    private final long initialDelay;
    private final long period;
    private final TimeUnit unit;

    public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
        this.scheduler = scheduler;
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
    }

    abstract T next() throws Exception;


    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Scheduler.Worker worker = scheduler.createWorker();
        subscriber.add(worker);
        worker.schedulePeriodically(new Action0() {
            @Override
            public void call() {
                try {
                    subscriber.onNext(next());
                } catch (Throwable e) {
                    try {
                        subscriber.onError(e);
                    } finally {
                        worker.unsubscribe();
                    }
                }
            }

        }, initialDelay, period, unit);
    }
}

//And here is the sample usage
 Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
        Random random = new Random();
        @Override
        String next() throws Exception {
            //if you use Schedulers.io, you can call the remote service synchronously
            switch(random.nextInt(10)){
                case 0:
                    return null;
                case 1:
                case 2:
                    return "Alice";
                default:
                    return "Bob";
            }
        }
    });

###

Short answer. RxJava2:

Observable.interval(initialDelay, unitAmount, timeUnit)
            .subscribe(value -> {
                // code for periodic execution
            });

Choose initialDelay, unitAmount and TimeUnit according to your needs.

Example: 0, 1, TimeUnit.MINUTES.

###

There is a simpler way to do it by using interval(). I have tested this code and it works.
But first, you should encapsulate the job you want to periodically execute in a subclass of Action1.

class Act<T> implements Action1<T> {
     public Service service;
     public String data;
     public void call(T t){
         service.log(data); //the periodic job
     }
}

(I have kept the fields public for brevity, but that isn’t advisable). Now you can schedule it the following way:

Act<Long> act=new Act<>();
act.data="dummy data";
act.service=this;
Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act);

This will not block your threads anywhere, unlike the approach given in the other answer. This approach allows us to pass a variable as a kind of mutable storage inside the Action which could be handy in subsequent invocations. Also, this way you could subscribe your call on your own thread pool.

###

Okay, I’ll post my own solution, maybe someone will benefit from it. I’ll only post the part related to the question, omitting the HTTP and caching stuff. Here’s how I do it:

private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable,
                                                                          final Observable<Boolean> pauseResumeObservable) {

    final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable,
            new Func2<Boolean, Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean networkAvailable, Boolean mustPause) {
                    return mustPause && networkAvailable;
                }
            }
    ).distinctUntilChanged();

    final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean networkAvailable) {
            return networkAvailable;
        }
    });
    final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean networkAvailable) {
            return !networkAvailable;
        }
    });


    return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() {
        @Override
        public Observable<Long> call(Boolean shouldResumeRequests) {
            if (shouldResumeRequests) {
                long timeToUpdate;
                final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt();
                timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis());
                Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS));
                return Observable
                        .timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS)
                        .takeUntil(hasToStopObservable);
            } else {
                Log.d(TAG, "Have to pause updates");
                return Observable.<Long>never().takeUntil(hasToResumeObservable);
            }
        }
    }).multicast(PublishSubject.<Long>create());
}

As you can see, the conditions to pause or resume updates become a bit more complicated, with a new Observable added to support pausing when app goes to background.

Then at the core of the solution is the concatMap operation which emits the Observables sequentially (hence concatMap, not flatMap, see this question: What is the difference between concatMap and flatMap in RxJava). It emits either interval or never Observables, depending on whether updates should be continued or paused. Then every emitted Observable is takenUntil ‘an opposite’ Observable emits new value.

ConnectableObservable is returned because the created Observable is hot, and all the intended subscribers have to subscribe to it before it starts emitting something, otherwise initial events could be lost. I call connect on it later.

I’ll accept either my or another answer based on votes, if any.

Leave a Reply

Your email address will not be published. Required fields are marked *