Rxjava: How To Handle Combinelatest() When One Of The Streams Emits Nothing
Solution 1:
If you have Java8 or some Optionals at your fingertips, you may use this construct:
@Test
void name() {
TestScheduler scheduler = new TestScheduler();
Observable<Optional<Integer>> o1$ =Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
Observable<Optional<Integer>> o2$ =Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());
Observable<Optional<Integer>> o3$ =Observable.<Optional<Integer>>never()
.timeout(1000, TimeUnit.MILLISECONDS, scheduler)
.onErrorResumeNext(
throwable -> {
returnObservable.<Optional<Integer>>never()
.mergeWith(Observable.just(Optional.empty()));
});
Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =Observable.combineLatest(
o1$,
o2$,
o3$,
(integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
.filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());
TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
result.test();
scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);
test.assertNotComplete().assertNoErrors().assertNoValues();
}
As you may no, null values are not allowed to be emitted through observables-pipelines. Therfore we need some other construct to represent null. In Java8 there is a construct called Optional (vavr calls it Option -> also Java8).
In this example o3$-Observable will not emit anything. It could also error, maybe this resembles your case a little bit more. We will catch the error (in this case: timeout-exception) and return a Observable with Optional.empty.
In the combination-callback we combine alle three values. In a later step we filter out all Tuples, which all have valid values (Optional with Value).
You will only get a value emitted, when all three values have been emitted with a value.
When you can not use a Optional-class, you can also define a INVALID-Object like in the following example:
classSo51217041{
privatestaticInteger INVALID_VALUE = 42;
@Test
void name() {
Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());
Observable<Integer> o3$ =
Observable.<Integer>never()
.onErrorResumeNext(
throwable -> {
return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
});
Observable<Tuple3<Integer, Integer, Integer>> result =
Observable.combineLatest(
o1$,
o2$,
o3$,
(integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
.filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content
TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();
test.assertNotComplete().assertNoErrors().assertNoValues();
}
}
Also, when you want a stream to start with INVALID or NULL, that the CombineLatest emits at least one value, you may use Observable#startWith(INVALID) oder Observable#startWith(Optional.empty()). This will guarantee, that the observable will at least emit one value.
Solution 2:
You can use public final Single first(T defaultItem) method. So the code may look like this
getLatestLog.execute()
.first(someDefaultNonNullLog)
.toObservable()
Post a Comment for "Rxjava: How To Handle Combinelatest() When One Of The Streams Emits Nothing"