Skip to content

Commit 9e908da

Browse files
Add onDropped callback for onBackpressureLatest - ReactiveX#7458
1 parent 329e8c9 commit 9e908da

File tree

3 files changed

+127
-6
lines changed

3 files changed

+127
-6
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12755,7 +12755,45 @@ public final Flowable<T> onBackpressureDrop(@NonNull Consumer<? super T> onDrop)
1275512755
@SchedulerSupport(SchedulerSupport.NONE)
1275612756
@NonNull
1275712757
public final Flowable<T> onBackpressureLatest() {
12758-
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
12758+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, null));
12759+
}
12760+
12761+
/**
12762+
* Drops all but the latest item emitted by the current {@code Flowable} if the downstream is not ready to receive
12763+
* new items (indicated by a lack of {@link Subscription#request(long)} calls from it) and emits this latest
12764+
* item when the downstream becomes ready.
12765+
* <p>
12766+
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.latest.v3.png" alt="">
12767+
* <p>
12768+
* Its behavior is logically equivalent to {@code blockingLatest()} with the exception that
12769+
* the downstream is not blocking while requesting more values.
12770+
* <p>
12771+
* Note that if the current {@code Flowable} does support backpressure, this operator ignores that capability
12772+
* and doesn't propagate any backpressure requests from downstream.
12773+
* <p>
12774+
* Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
12775+
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of {@code onNext} events.
12776+
* <dl>
12777+
* <dt><b>Backpressure:</b></dt>
12778+
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
12779+
* manner (i.e., not applying backpressure to it).</dd>
12780+
* <dt><b>Scheduler:</b></dt>
12781+
* <dd>{@code onBackpressureLatest} does not operate by default on a particular {@link Scheduler}.</dd>
12782+
* </dl>
12783+
*
12784+
* @param onDropped
12785+
* called with the current entry when it has been replaced by a new one
12786+
* @throws NullPointerException if {@code onDropped} is {@code null}
12787+
* @return the new {@code Flowable} instance
12788+
* @since 1.1.0
12789+
*/
12790+
@CheckReturnValue
12791+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
12792+
@SchedulerSupport(SchedulerSupport.NONE)
12793+
@NonNull
12794+
public final Flowable<T> onBackpressureLatest(@NonNull Consumer<T> onDropped) {
12795+
Objects.requireNonNull(onDropped, "onDropped is null");
12796+
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, onDropped));
1275912797
}
1276012798

1276112799
/**

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,48 @@
1414
package io.reactivex.rxjava3.internal.operators.flowable;
1515

1616
import io.reactivex.rxjava3.core.Flowable;
17+
import io.reactivex.rxjava3.exceptions.Exceptions;
18+
import io.reactivex.rxjava3.functions.Consumer;
1719
import org.reactivestreams.Subscriber;
1820

1921
public final class FlowableOnBackpressureLatest<T> extends AbstractFlowableWithUpstream<T, T> {
2022

21-
public FlowableOnBackpressureLatest(Flowable<T> source) {
23+
final Consumer<? super T> onDropped;
24+
25+
public FlowableOnBackpressureLatest(Flowable<T> source, Consumer<? super T> onDropped) {
2226
super(source);
27+
this.onDropped = onDropped;
2328
}
2429

2530
@Override
2631
protected void subscribeActual(Subscriber<? super T> s) {
27-
source.subscribe(new BackpressureLatestSubscriber<>(s));
32+
source.subscribe(new BackpressureLatestSubscriber<>(s, onDropped));
2833
}
2934

3035
static final class BackpressureLatestSubscriber<T> extends AbstractBackpressureThrottlingSubscriber<T, T> {
3136

3237
private static final long serialVersionUID = 163080509307634843L;
3338

34-
BackpressureLatestSubscriber(Subscriber<? super T> downstream) {
39+
final Consumer<? super T> onDropped;
40+
41+
BackpressureLatestSubscriber(Subscriber<? super T> downstream,
42+
Consumer<? super T> onDropped) {
3543
super(downstream);
44+
this.onDropped = onDropped;
3645
}
3746

3847
@Override
3948
public void onNext(T t) {
40-
current.lazySet(t);
49+
T oldValue = current.getAndSet(t);
50+
if (onDropped != null && oldValue != null) {
51+
try {
52+
onDropped.accept(oldValue);
53+
} catch (Throwable ex) {
54+
Exceptions.throwIfFatal(ex);
55+
upstream.cancel();
56+
downstream.onError(ex);
57+
}
58+
}
4159
drain();
4260
}
4361
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatestTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.TimeUnit;
1818

1919
import org.junit.*;
20+
import org.mockito.InOrder;
2021
import org.reactivestreams.Publisher;
2122

2223
import io.reactivex.rxjava3.core.*;
@@ -27,6 +28,8 @@
2728
import io.reactivex.rxjava3.subscribers.TestSubscriber;
2829
import io.reactivex.rxjava3.testsupport.*;
2930

31+
import static org.mockito.Mockito.inOrder;
32+
3033
public class FlowableOnBackpressureLatestTest extends RxJavaTest {
3134
@Test
3235
public void simple() {
@@ -62,6 +65,68 @@ public void simpleBackpressure() {
6265
ts.assertNotComplete();
6366
}
6467

68+
@Test
69+
public void simpleBackpressureWithOnDroppedCallback() {
70+
PublishProcessor<Integer> source = PublishProcessor.create();
71+
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);
72+
73+
Observer<Object> dropCallbackObserver = TestHelper.mockObserver();
74+
75+
source.onBackpressureLatest(dropCallbackObserver::onNext)
76+
.subscribe(ts);
77+
78+
ts.assertNoValues();
79+
80+
source.onNext(1);
81+
source.onNext(2);
82+
source.onNext(3);
83+
84+
ts.request(1);
85+
86+
ts.assertValues(3);
87+
88+
source.onNext(4);
89+
source.onNext(5);
90+
91+
ts.request(2);
92+
93+
ts.assertValues(3,5);
94+
95+
InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
96+
dropCallbackOrder.verify(dropCallbackObserver).onNext(1);
97+
dropCallbackOrder.verify(dropCallbackObserver).onNext(2);
98+
dropCallbackOrder.verify(dropCallbackObserver).onNext(4);
99+
dropCallbackOrder.verifyNoMoreInteractions();
100+
}
101+
102+
@Test
103+
public void simpleBackpressureWithOnDroppedCallbackEx() {
104+
PublishProcessor<Integer> source = PublishProcessor.create();
105+
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);
106+
107+
source.onBackpressureLatest(e -> {
108+
if (e == 3) {
109+
throw new TestException("forced");
110+
}
111+
})
112+
.subscribe(ts);
113+
114+
ts.assertNoValues();
115+
116+
source.onNext(1);
117+
source.onNext(2);
118+
119+
ts.request(1);
120+
121+
ts.assertValues(2);
122+
123+
source.onNext(3);
124+
source.onNext(4);
125+
126+
ts.assertError(TestException.class);
127+
ts.assertValues(2);
128+
}
129+
65130
@Test
66131
public void synchronousDrop() {
67132
PublishProcessor<Integer> source = PublishProcessor.create();
@@ -105,7 +170,7 @@ public void synchronousDrop() {
105170
}
106171

107172
@Test
108-
public void asynchronousDrop() throws InterruptedException {
173+
public void asynchronousDrop() {
109174
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>(1L) {
110175
final Random rnd = new Random();
111176
@Override

0 commit comments

Comments
 (0)