Skip to content Skip to sidebar Skip to footer

Rx Java Mergedelayerror Not Working As Expected

I'm using RxJava in and Android application with RxAndroid. I'm using mergeDelayError to combine two retro fit network calls into one observable which will process emitted items if

Solution 1:

Use .observeOn(AndroidSchedulers.mainThread(), true) instead of .observeOn(AndroidSchedulers.mainThread()

publicfinal Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }

Above is the signature of observeOn function. Following code works.

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });

Got this trick from ConcatDelayError thread: https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009

Solution 2:

This still seems like a bug in the mergeDelayError operator but I was able to get it working by duplicating the observerOn and Subscribe on for each observable.

Observable.mergeDelayError(
            Observable.error(new RuntimeException())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io()),
            Observable.just("Hello")
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io())
        )
        .finallyDo(completeAction)
        .subscribe(successAction, errorAction);

Solution 3:

I think you don't wait for the terminal event and the main thread quits before the events are delivered to your observer. The following test passes for me with RxJava 1.0.14:

@Test
publicvoid errorDelayed() {
    TestSubscriber<Object> ts = TestSubscriber.create();
    Observable.mergeDelayError(
            Observable.error(newRuntimeException()),
            Observable.just("Hello")
        )
        .subscribeOn(Schedulers.io()).subscribe(ts);

    ts.awaitTerminalEvent();

    ts.assertError(RuntimeException.class);
    ts.assertValue("Hello");
}

Post a Comment for "Rx Java Mergedelayerror Not Working As Expected"