Skip to content Skip to sidebar Skip to footer

Rxjava: How To Handle Combinelatest() When One Of The Streams Emits Nothing

I use combineLatest() to combine 3 streams of observables. All these are combined so that all data in the UI is shown at the same time. Now, there is a scenario in which one of the

Solution 1:

If you have Java8 or some Optionals at your fingertips, you may use this construct:

@Test
  void name() {
    TestScheduler scheduler = new TestScheduler();

    Observable<Optional<Integer>> o1$ =Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
    Observable<Optional<Integer>> o2$ =Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());

    Observable<Optional<Integer>> o3$ =Observable.<Optional<Integer>>never()
            .timeout(1000, TimeUnit.MILLISECONDS, scheduler)
            .onErrorResumeNext(
                throwable -> {
                  returnObservable.<Optional<Integer>>never()
                      .mergeWith(Observable.just(Optional.empty()));
                });

    Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());

    TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
        result.test();

    scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }

As you may no, null values are not allowed to be emitted through observables-pipelines. Therfore we need some other construct to represent null. In Java8 there is a construct called Optional (vavr calls it Option -> also Java8).

In this example o3$-Observable will not emit anything. It could also error, maybe this resembles your case a little bit more. We will catch the error (in this case: timeout-exception) and return a Observable with Optional.empty.

In the combination-callback we combine alle three values. In a later step we filter out all Tuples, which all have valid values (Optional with Value).

You will only get a value emitted, when all three values have been emitted with a value.

When you can not use a Optional-class, you can also define a INVALID-Object like in the following example:

classSo51217041{
  privatestaticInteger INVALID_VALUE = 42;

  @Test
  void name() {
    Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
    Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());

    Observable<Integer> o3$ =
        Observable.<Integer>never()
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
                });

    Observable<Tuple3<Integer, Integer, Integer>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content

    TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }
}

Also, when you want a stream to start with INVALID or NULL, that the CombineLatest emits at least one value, you may use Observable#startWith(INVALID) oder Observable#startWith(Optional.empty()). This will guarantee, that the observable will at least emit one value.

Solution 2:

You can use public final Single first(T defaultItem) method. So the code may look like this

getLatestLog.execute()
.first(someDefaultNonNullLog)
.toObservable()

Post a Comment for "Rxjava: How To Handle Combinelatest() When One Of The Streams Emits Nothing"