package nucleus.presenter.delivery; import rx.Notification; import rx.Observable; import rx.functions.Func1; import rx.functions.Func2; public class DeliverLatestCache implements Observable.Transformer> { private final Observable view; public DeliverLatestCache(Observable view) { this.view = view; } @Override public Observable> call(Observable observable) { return Observable .combineLatest( view, observable .materialize() .filter(new Func1, Boolean>() { @Override public Boolean call(Notification notification) { return !notification.isOnCompleted(); } }), new Func2, Delivery>() { @Override public Delivery call(View view, Notification notification) { return view == null ? null : new Delivery<>(view, notification); } }) .filter(new Func1, Boolean>() { @Override public Boolean call(Delivery delivery) { return delivery != null; } }); } }