java – How can I replace inter thread communication using volatile variables with rxjava?-ThrowExceptions

Exception or error:

I’ve got an application that does a lot of communication between threads by having one thread set a volatile variable on some object which another thread checks. I find this to be very error prone, and I want to try and replace it using RxJava bu there are some cases I can’t figure out how to convert.

The case I’m struggling with right now is where I have two threads, lets call one the controller and the other the measurer. The measurer’s job is to record some quantity every 100ms. The controller does a lot of work talking to various pieces of the app and every so often it will tell the measurer to change what it’s measuring. Right now it does that by setting a volatile variable, and every iteration of the measurer’s loop it checks that variable to see what to measure.

The measurer can’t be in the same thread as the controller as measuring takes time and the controller can’t delay the other work it’s doing.

It feels like the solution is something like making the controller an observable which will emit an item whenever the instructions to the measurer need updating but the only way I can see for the measurer to change it’s behaviour when an event is received is to have a subscriber to these events setting the volatile variable like before, and then I haven’t got anywhere.

I was wondering if I could somehow take the stream of items emitted by the controller and convert it into a stream that repeats each item over and over until the controller emits a different item, then I can subscribe to these repeated items in the measurer which would do a measurement every time it receives one. Is this the right approach, and if it is, how can I transform the items emitted by the controller into a repeated stream of items?

How to solve:

I’m relatively new to Rx, but I would use a BehaviorSubject. You can use distinctUntilChanged(), or combine it with a timer Observable:

    public enum Stat { FOO, BAR }

    public class Controller
    {
        private Subject<Stat> statSubject;

        public Controller()
        {
            statSubject = BehaviorSubject.<Stat>create().toSerialized();
        }

        public Observable<Stat> getStatChange()
        {
            return statSubject.distinctUntilChanged();
        }

        public void setStat( Stat stat )
        {
            statSubject.onNext( stat );
        }
    }

    public class Measurer
    {
        public Measurer( Controller controller )
        {
            Observable.timer( 1, TimeUnit.SECONDS, Schedulers.newThread() )
                .repeat()
                .withLatestFrom(
                        controller.getStatChange(),
                        ( __, stat ) -> stat ) // ignore the Long emitted by timer
                .subscribe( this::measureStat );
        }

        private void measureStat( Stat stat )
        {
            switch( stat )
            {
            case FOO:
                measureFoo();
                break;

            default:
                measureBar();
                break;
            }
        }

        private void measureBar()
        {
            System.out.println( "Measuring Bar" );
        }

        private void measureFoo()
        {
            System.out.println( "Measuring Foo" );
        }
    }

    @Test
    public void testMeasureStats() throws InterruptedException
    {
        Controller controller = new Controller();
        controller.setStat( Stat.BAR );

        @SuppressWarnings( "unused" )
        Measurer measurer = new Measurer( controller );

        Thread.sleep( 5000 );

        controller.setStat( Stat.FOO );

        Thread.sleep( 5000 );

        controller.setStat( Stat.BAR );

        Thread.sleep( 5000 );
    }

Output:

Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar

Leave a Reply

Your email address will not be published. Required fields are marked *