Skip to content Skip to sidebar Skip to footer

Rxjava - Make A Pausable Observable (with Buffer And Window For Example)

I want to create observables that do following: buffer all items, while they are paused immediately emit items, while they are not paused the pause/resume trigger must come from a

Solution 1:

You can actually use .buffer() operator passing it observable, defining when to stop buffering, sample from book:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);

from chapter 5, 'Taming the sequence': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

You can use PublishSubject as Observable to feed it elements in your custom operator. Every time you need to start buffering, create instance by Observable.defer(() -> createBufferingValve())

Solution 2:

I made similar thing for logging events. Subject collects some events, and one time in 10 seconds pushes them to server.

The main idea is, for example you have class Event.

publicclassEvent {

    publicString jsonData;

    publicStringgetJsonData() {
        return jsonData;
    }

    publicEventsetJsonData(String jsonData) {
        this.jsonData = jsonData;
        returnthis;
    }
}

You should create queue for events:

privatePublishSubject<Event> eventQueue =PublishSubject.create();

It can be BehaviorSubject, it doesn't matter

Then you should create the logic, which will handle pushing events to the server:

    eventObservable = eventQueue.asObservable()
            .buffer(10, TimeUnit.SECONDS)   //flush events every 10 seconds
            .toList()
            .doOnNext(new Action1<List<Event>>() {
                @Overridepublic void call(List<Event> events) {
                    apiClient.pushEvents(events);     //push your event
                }
            })
            .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() {
                @OverridepublicObservable<List<Event>> call(Throwable throwable) {
                    return null;    //make sure, that on error will be never called
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io());

Then you should subscribe to it, and retain subscription, until you don't need it:

eventSubscription = eventObservable.subscribe()

Home this helps

Post a Comment for "Rxjava - Make A Pausable Observable (with Buffer And Window For Example)"