Skip to content

Commit dabe4d5

Browse files
Add onDropped callback for throttleWithTimeout - ReactiveX#7458
1 parent 65d0739 commit dabe4d5

File tree

4 files changed

+153
-12
lines changed

4 files changed

+153
-12
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+92-1
Original file line numberDiff line numberDiff line change
@@ -8930,7 +8930,57 @@ public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit) {
89308930
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
89318931
Objects.requireNonNull(unit, "unit is null");
89328932
Objects.requireNonNull(scheduler, "scheduler is null");
8933-
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler));
8933+
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, null));
8934+
}
8935+
8936+
/**
8937+
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
8938+
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
8939+
* {@link Scheduler}. The timer resets on each emission.
8940+
* <p>
8941+
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
8942+
* will be emitted by the resulting {@code Flowable}.
8943+
* <p>
8944+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.s.v3.png" alt="">
8945+
* <p>
8946+
* Delivery of the item after the grace period happens on the given {@code Scheduler}'s
8947+
* {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
8948+
* {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
8949+
* (yielding an {@code InterruptedException}). It is recommended processing items
8950+
* that may take long time to be moved to another thread via {@link #observeOn} applied after
8951+
* {@code debounce} itself.
8952+
* <dl>
8953+
* <dt><b>Backpressure:</b></dt>
8954+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
8955+
* <dt><b>Scheduler:</b></dt>
8956+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
8957+
* </dl>
8958+
*
8959+
* @param timeout
8960+
* the time each item has to be "the most recent" of those emitted by the current {@code Flowable} to
8961+
* ensure that it's not dropped
8962+
* @param unit
8963+
* the unit of time for the specified {@code timeout}
8964+
* @param scheduler
8965+
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
8966+
* item
8967+
* @param onDropped
8968+
* called with the current entry when it has been replaced by a new one
8969+
* @return the new {@code Flowable} instance
8970+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
8971+
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
8972+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
8973+
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
8974+
*/
8975+
@CheckReturnValue
8976+
@NonNull
8977+
@BackpressureSupport(BackpressureKind.ERROR)
8978+
@SchedulerSupport(SchedulerSupport.CUSTOM)
8979+
public final Flowable<T> debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
8980+
Objects.requireNonNull(unit, "unit is null");
8981+
Objects.requireNonNull(scheduler, "scheduler is null");
8982+
Objects.requireNonNull(onDropped, "onDropped is null");
8983+
return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
89348984
}
89358985

89368986
/**
@@ -17587,6 +17637,47 @@ public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit uni
1758717637
return debounce(timeout, unit, scheduler);
1758817638
}
1758917639

17640+
/**
17641+
* Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
17642+
* current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
17643+
* {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler)}).
17644+
* <p>
17645+
* <em>Note:</em> If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
17646+
* will be emitted by the resulting {@code Flowable}.
17647+
* <p>
17648+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.s.v3.png" alt="">
17649+
* <dl>
17650+
* <dt><b>Backpressure:</b></dt>
17651+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17652+
* <dt><b>Scheduler:</b></dt>
17653+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
17654+
* </dl>
17655+
*
17656+
* @param timeout
17657+
* the length of the window of time that must pass after the emission of an item from the current
17658+
* {@code Flowable} in which it emits no items in order for the item to be emitted by the
17659+
* resulting {@code Flowable}
17660+
* @param unit
17661+
* the unit of time for the specified {@code timeout}
17662+
* @param scheduler
17663+
* the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
17664+
* item
17665+
* @param onDropped
17666+
* called with the current entry when it has been replaced by a new one
17667+
* @return the new {@code Flowable} instance
17668+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
17669+
* @see <a href="http://reactivex.io/documentation/operators/debounce.html">ReactiveX operators documentation: Debounce</a>
17670+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17671+
* @see #debounce(long, TimeUnit, Scheduler)
17672+
*/
17673+
@CheckReturnValue
17674+
@BackpressureSupport(BackpressureKind.ERROR)
17675+
@SchedulerSupport(SchedulerSupport.CUSTOM)
17676+
@NonNull
17677+
public final Flowable<T> throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
17678+
return debounce(timeout, unit, scheduler, onDropped);
17679+
}
17680+
1759017681
/**
1759117682
* Returns a {@code Flowable} that emits records of the time interval between consecutive items emitted by the
1759217683
* current {@code Flowable}.

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java

+25-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.concurrent.TimeUnit;
1717
import java.util.concurrent.atomic.*;
1818

19+
import io.reactivex.rxjava3.exceptions.Exceptions;
20+
import io.reactivex.rxjava3.functions.Consumer;
1921
import org.reactivestreams.*;
2022

2123
import io.reactivex.rxjava3.core.*;
@@ -32,19 +34,20 @@ public final class FlowableDebounceTimed<T> extends AbstractFlowableWithUpstream
3234
final long timeout;
3335
final TimeUnit unit;
3436
final Scheduler scheduler;
37+
final Consumer<T> onDropped;
3538

36-
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
39+
public FlowableDebounceTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer<T> onDropped) {
3740
super(source);
3841
this.timeout = timeout;
3942
this.unit = unit;
4043
this.scheduler = scheduler;
44+
this.onDropped = onDropped;
4145
}
4246

4347
@Override
4448
protected void subscribeActual(Subscriber<? super T> s) {
4549
source.subscribe(new DebounceTimedSubscriber<>(
46-
new SerializedSubscriber<>(s),
47-
timeout, unit, scheduler.createWorker()));
50+
new SerializedSubscriber<>(s), timeout, unit, scheduler.createWorker(), onDropped));
4851
}
4952

5053
static final class DebounceTimedSubscriber<T> extends AtomicLong
@@ -55,20 +58,22 @@ static final class DebounceTimedSubscriber<T> extends AtomicLong
5558
final long timeout;
5659
final TimeUnit unit;
5760
final Scheduler.Worker worker;
61+
final Consumer<T> onDropped;
5862

5963
Subscription upstream;
6064

61-
Disposable timer;
65+
DebounceEmitter<T> timer;
6266

6367
volatile long index;
6468

6569
boolean done;
6670

67-
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
71+
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer<T> onDropped) {
6872
this.downstream = actual;
6973
this.timeout = timeout;
7074
this.unit = unit;
7175
this.worker = worker;
76+
this.onDropped = onDropped;
7277
}
7378

7479
@Override
@@ -93,6 +98,18 @@ public void onNext(T t) {
9398
d.dispose();
9499
}
95100

101+
if (onDropped != null && timer != null) {
102+
try {
103+
onDropped.accept(timer.value);
104+
} catch (Throwable ex) {
105+
Exceptions.throwIfFatal(ex);
106+
upstream.cancel();
107+
done = true;
108+
downstream.onError(ex);
109+
worker.dispose();
110+
}
111+
}
112+
96113
DebounceEmitter<T> de = new DebounceEmitter<>(t, idx, this);
97114
timer = de;
98115
d = worker.schedule(de, timeout, unit);
@@ -121,15 +138,13 @@ public void onComplete() {
121138
}
122139
done = true;
123140

124-
Disposable d = timer;
141+
DebounceEmitter<T> d = timer;
125142
if (d != null) {
126143
d.dispose();
127144
}
128145

129-
@SuppressWarnings("unchecked")
130-
DebounceEmitter<T> de = (DebounceEmitter<T>)d;
131-
if (de != null) {
132-
de.emit();
146+
if (d != null) {
147+
d.emit();
133148
}
134149

135150
downstream.onComplete();

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTest.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,39 @@ public void before() {
5151
innerScheduler = scheduler.createWorker();
5252
}
5353

54+
@Test
55+
public void debounceWithOnDroppedCallback() {
56+
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
57+
@Override
58+
public void subscribe(Subscriber<? super String> subscriber) {
59+
subscriber.onSubscribe(new BooleanSubscription());
60+
publishNext(subscriber, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
61+
publishNext(subscriber, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
62+
publishNext(subscriber, 900, "three"); // Should be skipped since "four" will arrive before the timout expires.
63+
publishNext(subscriber, 999, "four"); // Should be skipped since onComplete will arrive before the timeout expires.
64+
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
65+
}
66+
});
67+
68+
Observer<Object> dropCallbackObserver = TestHelper.mockObserver();
69+
Flowable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler, dropCallbackObserver::onNext);
70+
sampled.subscribe(Subscriber);
71+
72+
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
73+
InOrder inOrder = inOrder(Subscriber);
74+
InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
75+
76+
// must go to 800 since it must be 400 after when two is sent, which is at 400
77+
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
78+
inOrder.verify(Subscriber, times(1)).onNext("two");
79+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("one");
80+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
81+
dropCallbackOrder.verify(dropCallbackObserver, times(1)).onNext("three");
82+
inOrder.verify(Subscriber, times(1)).onComplete();
83+
inOrder.verifyNoMoreInteractions();
84+
dropCallbackOrder.verifyNoMoreInteractions();
85+
}
86+
5487
@Test
5588
public void debounceWithCompleted() {
5689
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
@@ -530,7 +563,7 @@ public void timedBadRequest() {
530563
public void timedLateEmit() {
531564
TestSubscriber<Integer> ts = new TestSubscriber<>();
532565
DebounceTimedSubscriber<Integer> sub = new DebounceTimedSubscriber<>(
533-
ts, 1, TimeUnit.SECONDS, new TestScheduler().createWorker());
566+
ts, 1, TimeUnit.SECONDS, new TestScheduler().createWorker(), null);
534567

535568
sub.onSubscribe(new BooleanSubscription());
536569

src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public void checkParallelFlowable() {
149149
// negative time is considered as zero time
150150
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class));
151151
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class));
152+
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "debounce", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class));
152153

153154
// null Action allowed
154155
addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "onBackpressureBuffer", Long.TYPE, Action.class, BackpressureOverflowStrategy.class));
@@ -177,6 +178,7 @@ public void checkParallelFlowable() {
177178
// negative time is considered as zero time
178179
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class));
179180
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class));
181+
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "throttleWithTimeout", Long.TYPE, TimeUnit.class, Scheduler.class, Consumer.class));
180182

181183
// negative time is considered as zero time
182184
addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "take", Long.TYPE, TimeUnit.class));

0 commit comments

Comments
 (0)