From 7a2792a3b4b59193e996a172b3d69f7cfd09c8a3 Mon Sep 17 00:00:00 2001 From: des Date: Fri, 4 Nov 2022 09:57:33 +0000 Subject: [PATCH] Add onDropped callback for throttleFirst - addresses #7458 --- .../io/reactivex/rxjava3/core/Flowable.java | 43 ++++++++++- .../io/reactivex/rxjava3/core/Observable.java | 39 +++++++++- .../flowable/FlowableThrottleFirstTimed.java | 37 +++++++--- .../ObservableThrottleFirstTimed.java | 37 ++++++++-- .../flowable/FlowableThrottleFirstTest.java | 72 +++++++++++++++++++ .../ObservableThrottleFirstTest.java | 71 ++++++++++++++++++ .../ParamValidationCheckerTest.java | 2 + 7 files changed, 284 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 6944611a89..c391ef9500 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -17096,7 +17096,48 @@ public final Flowable throttleFirst(long windowDuration, @NonNull TimeUnit un public final Flowable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler)); + return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null)); + } + + /** + * Returns a {@code Flowable} that emits only the first item emitted by the current {@code Flowable} during sequential + * time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}. + *

+ * This differs from {@link #throttleLast} in that this only tracks the passage of time whereas + * {@link #throttleLast} ticks at scheduled intervals. + *

+ * + *

+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param skipDuration + * time to wait before emitting another item after emitting the last item + * @param unit + * the unit of time of {@code skipDuration} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle timeout for each + * event + * @param onDropped + * called when an item doesn't get delivered to the downstream + * + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Sample + * @see RxJava wiki: Backpressure + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.ERROR) + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Flowable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new FlowableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped)); } /** diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 32cedef3b8..7492f6f636 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -14163,7 +14163,44 @@ public final Observable throttleFirst(long windowDuration, @NonNull TimeUnit public final Observable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler)); + return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, null)); + } + + /** + * Returns an {@code Observable} that emits only the first item emitted by the current {@code Observable} during sequential + * time windows of a specified duration, where the windows are managed by a specified {@link Scheduler}. + *

+ * This differs from {@link #throttleLast} in that this only tracks passage of time whereas + * {@code throttleLast} ticks at scheduled intervals. + *

+ * + *

+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ * + * @param skipDuration + * time to wait before emitting another item after emitting the last item + * @param unit + * the unit of time of {@code skipDuration} + * @param scheduler + * the {@code Scheduler} to use internally to manage the timers that handle timeout for each + * event + * @param onDropped + * called when an item doesn't get delivered to the downstream + * + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null} + * @see ReactiveX operators documentation: Sample + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Observable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer onDropped) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + Objects.requireNonNull(onDropped, "onDropped is null"); + return RxJavaPlugins.onAssembly(new ObservableThrottleFirstTimed<>(this, skipDuration, unit, scheduler, onDropped)); } /** diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java index 7ddcbf792a..223076dfc7 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java @@ -16,6 +16,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Consumer; import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; @@ -32,44 +34,53 @@ public final class FlowableThrottleFirstTimed extends AbstractFlowableWithUps final long timeout; final TimeUnit unit; final Scheduler scheduler; + final Consumer onDropped; - public FlowableThrottleFirstTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler) { + public FlowableThrottleFirstTimed(Flowable source, + long timeout, + TimeUnit unit, + Scheduler scheduler, + Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; + this.onDropped = onDropped; } @Override protected void subscribeActual(Subscriber s) { source.subscribe(new DebounceTimedSubscriber<>( new SerializedSubscriber<>(s), - timeout, unit, scheduler.createWorker())); + timeout, unit, scheduler.createWorker(), + onDropped)); } static final class DebounceTimedSubscriber extends AtomicLong implements FlowableSubscriber, Subscription, Runnable { - private static final long serialVersionUID = -9102637559663639004L; + final Subscriber downstream; final long timeout; final TimeUnit unit; final Scheduler.Worker worker; - + final Consumer onDropped; Subscription upstream; - final SequentialDisposable timer = new SequentialDisposable(); - volatile boolean gate; - boolean done; - DebounceTimedSubscriber(Subscriber actual, long timeout, TimeUnit unit, Worker worker) { + DebounceTimedSubscriber(Subscriber actual, + long timeout, + TimeUnit unit, + Worker worker, + Consumer onDropped) { this.downstream = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; + this.onDropped = onDropped; } @Override @@ -106,6 +117,16 @@ public void onNext(T t) { } timer.replace(worker.schedule(this, timeout, unit)); + } else if (onDropped != null) { + try { + onDropped.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + worker.dispose(); + upstream.cancel(); + done = true; + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java index 889c203dd7..aa0f5058fd 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java @@ -19,6 +19,8 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Scheduler.Worker; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; import io.reactivex.rxjava3.observers.SerializedObserver; @@ -26,20 +28,27 @@ public final class ObservableThrottleFirstTimed extends AbstractObservableWit final long timeout; final TimeUnit unit; final Scheduler scheduler; - - public ObservableThrottleFirstTimed(ObservableSource source, - long timeout, TimeUnit unit, Scheduler scheduler) { + final Consumer onDropped; + + public ObservableThrottleFirstTimed( + ObservableSource source, + long timeout, + TimeUnit unit, + Scheduler scheduler, + Consumer onDropped) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; + this.onDropped = onDropped; } @Override public void subscribeActual(Observer t) { source.subscribe(new DebounceTimedObserver<>( new SerializedObserver<>(t), - timeout, unit, scheduler.createWorker())); + timeout, unit, scheduler.createWorker(), + onDropped)); } static final class DebounceTimedObserver @@ -51,16 +60,21 @@ static final class DebounceTimedObserver final long timeout; final TimeUnit unit; final Scheduler.Worker worker; - + final Consumer onDropped; Disposable upstream; - volatile boolean gate; - DebounceTimedObserver(Observer actual, long timeout, TimeUnit unit, Worker worker) { + DebounceTimedObserver( + Observer actual, + long timeout, + TimeUnit unit, + Worker worker, + Consumer onDropped) { this.downstream = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; + this.onDropped = onDropped; } @Override @@ -83,6 +97,15 @@ public void onNext(T t) { d.dispose(); } DisposableHelper.replace(this, worker.schedule(this, timeout, unit)); + } else if (onDropped != null) { + try { + onDropped.accept(t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + worker.dispose(); + upstream.dispose(); + } } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java index 9b71893c63..68a559e3cd 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import io.reactivex.rxjava3.functions.Action; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; @@ -44,6 +45,77 @@ public void before() { subscriber = TestHelper.mockSubscriber(); } + @Test + public void throttlingWithDropCallbackCrashes() throws Throwable { + Flowable source = Flowable.unsafeCreate(new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + publishNext(subscriber, 100, "one"); // publish as it's first + publishNext(subscriber, 300, "two"); // skip as it's last within the first 400 + publishNext(subscriber, 900, "three"); // publish + publishNext(subscriber, 905, "four"); // skip + publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires. + } + }); + + Action whenDisposed = mock(Action.class); + + Flowable sampled = source + .doOnCancel(whenDisposed) + .throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, e -> { + if ("two".equals(e)) { + throw new TestException("forced"); + } + }); + sampled.subscribe(subscriber); + + InOrder inOrder = inOrder(subscriber); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(subscriber, times(1)).onNext("one"); + inOrder.verify(subscriber, times(1)).onError(any(TestException.class)); + inOrder.verify(subscriber, times(0)).onNext("two"); + inOrder.verify(subscriber, times(0)).onNext("three"); + inOrder.verify(subscriber, times(0)).onNext("four"); + inOrder.verify(subscriber, times(0)).onComplete(); + inOrder.verifyNoMoreInteractions(); + verify(whenDisposed).run(); + } + + @Test + public void throttlingWithDropCallback() { + Flowable source = Flowable.unsafeCreate(new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + publishNext(subscriber, 100, "one"); // publish as it's first + publishNext(subscriber, 300, "two"); // skip as it's last within the first 400 + publishNext(subscriber, 900, "three"); // publish + publishNext(subscriber, 905, "four"); // skip + publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires. + } + }); + + Observer dropCallbackObserver = TestHelper.mockObserver(); + Flowable sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext); + sampled.subscribe(subscriber); + + InOrder inOrder = inOrder(subscriber); + InOrder dropCallbackOrder = inOrder(dropCallbackObserver); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(subscriber, times(1)).onNext("one"); + inOrder.verify(subscriber, times(0)).onNext("two"); + dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("two"); + inOrder.verify(subscriber, times(1)).onNext("three"); + inOrder.verify(subscriber, times(0)).onNext("four"); + dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("four"); + inOrder.verify(subscriber, times(1)).onComplete(); + inOrder.verifyNoMoreInteractions(); + dropCallbackOrder.verifyNoMoreInteractions(); + } + @Test public void throttlingWithCompleted() { Flowable source = Flowable.unsafeCreate(new Publisher() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTest.java index 64c0a8dae0..fe296d572b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; +import io.reactivex.rxjava3.functions.Action; import org.junit.*; import org.mockito.InOrder; @@ -41,6 +42,76 @@ public void before() { observer = TestHelper.mockObserver(); } + @Test + public void throttlingWithDropCallbackCrashes() throws Throwable { + Observable source = Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer innerObserver) { + innerObserver.onSubscribe(Disposable.empty()); + publishNext(innerObserver, 100, "one"); // publish as it's first + publishNext(innerObserver, 300, "two"); // skip as it's last within the first 400 + publishNext(innerObserver, 900, "three"); // publish + publishNext(innerObserver, 905, "four"); // skip + publishCompleted(innerObserver, 1000); // Should be published as soon as the timeout expires. + } + }); + + Action whenDisposed = mock(Action.class); + Observable sampled = source + .doOnDispose(whenDisposed) + .throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, e -> { + if ("two".equals(e)) { + throw new TestException("forced"); + } + }); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onError(any(TestException.class)); + inOrder.verify(observer, times(0)).onNext("two"); + inOrder.verify(observer, times(0)).onNext("three"); + inOrder.verify(observer, times(0)).onNext("four"); + inOrder.verify(observer, times(0)).onComplete(); + inOrder.verifyNoMoreInteractions(); + verify(whenDisposed).run(); + } + + @Test + public void throttlingWithDropCallback() { + Observable source = Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer innerObserver) { + innerObserver.onSubscribe(Disposable.empty()); + publishNext(innerObserver, 100, "one"); // publish as it's first + publishNext(innerObserver, 300, "two"); // skip as it's last within the first 400 + publishNext(innerObserver, 900, "three"); // publish + publishNext(innerObserver, 905, "four"); // skip + publishCompleted(innerObserver, 1000); // Should be published as soon as the timeout expires. + } + }); + + Observer dropCallbackObserver = TestHelper.mockObserver(); + Observable sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + InOrder dropCallbackOrder = inOrder(dropCallbackObserver); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(0)).onNext("two"); + dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(0)).onNext("four"); + dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("four"); + inOrder.verify(observer, times(1)).onComplete(); + inOrder.verifyNoMoreInteractions(); + dropCallbackOrder.verifyNoMoreInteractions(); + } + @Test public void throttlingWithCompleted() { Observable source = Observable.unsafeCreate(new ObservableSource() { diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index a0e75f34ff..4fa3dfb39a 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -220,6 +220,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class)); @@ -465,6 +466,7 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleFirst", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class)); // negative time is considered as zero time addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "throttleLast", Long.TYPE, TimeUnit.class));