Skip to content

Add onDropped callback for onBackpressureLatest #7542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12755,7 +12755,46 @@ public final Flowable<T> onBackpressureDrop(@NonNull Consumer<? super T> onDrop)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Flowable<T> onBackpressureLatest() {
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, null));
}

/**
* Drops all but the latest item emitted by the current {@code Flowable} if the downstream is not ready to receive
* new items (indicated by a lack of {@link Subscription#request(long)} calls from it) and emits this latest
* item when the downstream becomes ready.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.latest.v3.png" alt="">
* <p>
* Its behavior is logically equivalent to {@code blockingLatest()} with the exception that
* the downstream is not blocking while requesting more values.
* <p>
* Note that if the current {@code Flowable} does support backpressure, this operator ignores that capability
* and doesn't propagate any backpressure requests from downstream.
* <p>
* Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
* requesting more than 1 from downstream doesn't guarantee a continuous delivery of {@code onNext} events.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
* manner (i.e., not applying backpressure to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureLatest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onDropped
* called with the current entry when it has been replaced by a new one
* @throws NullPointerException if {@code onDropped} is {@code null}
* @return the new {@code Flowable} instance
* @since 3.1.7
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@Experimental
public final Flowable<T> onBackpressureLatest(@NonNull Consumer<? super T> onDropped) {
Objects.requireNonNull(onDropped, "onDropped is null");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, onDropped));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,48 @@
package io.reactivex.rxjava3.internal.operators.flowable;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.Subscriber;

public final class FlowableOnBackpressureLatest<T> extends AbstractFlowableWithUpstream<T, T> {

public FlowableOnBackpressureLatest(Flowable<T> source) {
final Consumer<? super T> onDropped;

public FlowableOnBackpressureLatest(Flowable<T> source, Consumer<? super T> onDropped) {
super(source);
this.onDropped = onDropped;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new BackpressureLatestSubscriber<>(s));
source.subscribe(new BackpressureLatestSubscriber<>(s, onDropped));
}

static final class BackpressureLatestSubscriber<T> extends AbstractBackpressureThrottlingSubscriber<T, T> {

private static final long serialVersionUID = 163080509307634843L;

BackpressureLatestSubscriber(Subscriber<? super T> downstream) {
final Consumer<? super T> onDropped;

BackpressureLatestSubscriber(Subscriber<? super T> downstream,
Consumer<? super T> onDropped) {
super(downstream);
this.onDropped = onDropped;
}

@Override
public void onNext(T t) {
current.lazySet(t);
T oldValue = current.getAndSet(t);
if (onDropped != null && oldValue != null) {
try {
onDropped.accept(oldValue);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
downstream.onError(ex);
}
}
drain();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.TimeUnit;

import org.junit.*;
import org.mockito.InOrder;
import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.*;
Expand All @@ -27,6 +28,8 @@
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;

import static org.mockito.Mockito.inOrder;

public class FlowableOnBackpressureLatestTest extends RxJavaTest {
@Test
public void simple() {
Expand Down Expand Up @@ -62,6 +65,68 @@ public void simpleBackpressure() {
ts.assertNotComplete();
}

@Test
public void simpleBackpressureWithOnDroppedCallback() {
PublishProcessor<Integer> source = PublishProcessor.create();
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);

Observer<Object> dropCallbackObserver = TestHelper.mockObserver();

source.onBackpressureLatest(dropCallbackObserver::onNext)
.subscribe(ts);

ts.assertNoValues();

source.onNext(1);
source.onNext(2);
source.onNext(3);

ts.request(1);

ts.assertValues(3);

source.onNext(4);
source.onNext(5);

ts.request(2);

ts.assertValues(3,5);

InOrder dropCallbackOrder = inOrder(dropCallbackObserver);
dropCallbackOrder.verify(dropCallbackObserver).onNext(1);
dropCallbackOrder.verify(dropCallbackObserver).onNext(2);
dropCallbackOrder.verify(dropCallbackObserver).onNext(4);
dropCallbackOrder.verifyNoMoreInteractions();
}

@Test
public void simpleBackpressureWithOnDroppedCallbackEx() {
PublishProcessor<Integer> source = PublishProcessor.create();
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);

source.onBackpressureLatest(e -> {
if (e == 3) {
throw new TestException("forced");
}
})
.subscribe(ts);

ts.assertNoValues();

source.onNext(1);
source.onNext(2);

ts.request(1);

ts.assertValues(2);

source.onNext(3);
source.onNext(4);

ts.assertError(TestException.class);
ts.assertValues(2);
}

@Test
public void synchronousDrop() {
PublishProcessor<Integer> source = PublishProcessor.create();
Expand Down Expand Up @@ -105,7 +170,7 @@ public void synchronousDrop() {
}

@Test
public void asynchronousDrop() throws InterruptedException {
public void asynchronousDrop() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>(1L) {
final Random rnd = new Random();
@Override
Expand Down