Rxjava - Make A Pausable Observable (with Buffer And Window For Example)
Solution 1:
You can actually use .buffer()
operator passing it observable, defining when to stop buffering, sample from book:
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);
from chapter 5, 'Taming the sequence': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md
You can use PublishSubject
as Observable
to feed it elements in your custom operator. Every time you need to start buffering, create instance by Observable.defer(() -> createBufferingValve())
Solution 2:
I made similar thing for logging events. Subject collects some events, and one time in 10 seconds pushes them to server.
The main idea is, for example you have class Event
.
publicclassEvent {
publicString jsonData;
publicStringgetJsonData() {
return jsonData;
}
publicEventsetJsonData(String jsonData) {
this.jsonData = jsonData;
returnthis;
}
}
You should create queue for events:
privatePublishSubject<Event> eventQueue =PublishSubject.create();
It can be BehaviorSubject
, it doesn't matter
Then you should create the logic, which will handle pushing events to the server:
eventObservable = eventQueue.asObservable()
.buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds
.toList()
.doOnNext(new Action1<List<Event>>() {
@Overridepublic void call(List<Event> events) {
apiClient.pushEvents(events); //push your event
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() {
@OverridepublicObservable<List<Event>> call(Throwable throwable) {
return null; //make sure, that on error will be never called
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
Then you should subscribe to it, and retain subscription, until you don't need it:
eventSubscription = eventObservable.subscribe()
Home this helps
Post a Comment for "Rxjava - Make A Pausable Observable (with Buffer And Window For Example)"