Skip to content

Commit c2d9279

Browse files
committed
3.x: constrain upstream requests from take (ReactiveX#6569)
1 parent 9a36930 commit c2d9279

15 files changed

+76
-247
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+2-50
Original file line numberDiff line numberDiff line change
@@ -11212,52 +11212,6 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
1121211212
return RxJavaPlugins.onAssembly(new FlowableLift<R, T>(this, lifter));
1121311213
}
1121411214

11215-
/**
11216-
* Limits both the number of upstream items (after which the sequence completes)
11217-
* and the total downstream request amount requested from the upstream to
11218-
* possibly prevent the creation of excess items by the upstream.
11219-
* <p>
11220-
* The operator requests at most the given {@code count} of items from upstream even
11221-
* if the downstream requests more than that. For example, given a {@code limit(5)},
11222-
* if the downstream requests 1, a request of 1 is submitted to the upstream
11223-
* and the operator remembers that only 4 items can be requested now on. A request
11224-
* of 5 at this point will request 4 from the upstream and any subsequent requests will
11225-
* be ignored.
11226-
* <p>
11227-
* Note that requests are negotiated on an operator boundary and {@code limit}'s amount
11228-
* may not be preserved further upstream. For example,
11229-
* {@code source.observeOn(Schedulers.computation()).limit(5)} will still request the
11230-
* default (128) elements from the given {@code source}.
11231-
* <p>
11232-
* The main use of this operator is with sources that are async boundaries that
11233-
* don't interfere with request amounts, such as certain {@code Flowable}-based
11234-
* network endpoints that relay downstream request amounts unchanged and are, therefore,
11235-
* prone to trigger excessive item creation/transmission over the network.
11236-
* <dl>
11237-
* <dt><b>Backpressure:</b></dt>
11238-
* <dd>The operator requests a total of the given {@code count} items from the upstream.</dd>
11239-
* <dt><b>Scheduler:</b></dt>
11240-
* <dd>{@code limit} does not operate by default on a particular {@link Scheduler}.</dd>
11241-
* </dl>
11242-
* <p>History: 2.1.6 - experimental
11243-
* @param count the maximum number of items and the total request amount, non-negative.
11244-
* Zero will immediately cancel the upstream on subscription and complete
11245-
* the downstream.
11246-
* @return the new Flowable instance
11247-
* @see #take(long)
11248-
* @see #rebatchRequests(int)
11249-
* @since 2.2
11250-
*/
11251-
@BackpressureSupport(BackpressureKind.SPECIAL)
11252-
@SchedulerSupport(SchedulerSupport.NONE)
11253-
@CheckReturnValue
11254-
public final Flowable<T> limit(long count) {
11255-
if (count < 0) {
11256-
throw new IllegalArgumentException("count >= 0 required but it was " + count);
11257-
}
11258-
return RxJavaPlugins.onAssembly(new FlowableLimit<T>(this, count));
11259-
}
11260-
1126111215
/**
1126211216
* Returns a Flowable that applies a specified function to each item emitted by the source Publisher and
1126311217
* emits the results of these function applications.
@@ -15383,9 +15337,7 @@ public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super
1538315337
* {@link Subscriber#onComplete onComplete}.
1538415338
* <dl>
1538515339
* <dt><b>Backpressure:</b></dt>
15386-
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
15387-
* behavior in case the first request is smaller than the {@code count}. Otherwise, the source {@code Publisher}
15388-
* is consumed in an unbounded manner (i.e., without applying backpressure to it).</dd>
15340+
* <dd>The source {@code Publisher} is consumed in a bounded manner.</dd>
1538915341
* <dt><b>Scheduler:</b></dt>
1539015342
* <dd>This version of {@code take} does not operate by default on a particular {@link Scheduler}.</dd>
1539115343
* </dl>
@@ -15397,7 +15349,7 @@ public final <R> Flowable<R> switchMapSingleDelayError(@NonNull Function<? super
1539715349
* @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
1539815350
*/
1539915351
@CheckReturnValue
15400-
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
15352+
@BackpressureSupport(BackpressureKind.FULL)
1540115353
@SchedulerSupport(SchedulerSupport.NONE)
1540215354
public final Flowable<T> take(long count) {
1540315355
if (count < 0) {

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLimit.java

-135
This file was deleted.

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.rxjava3.internal.operators.flowable;
1515

16-
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.concurrent.atomic.AtomicLong;
1717

1818
import org.reactivestreams.*;
1919

@@ -33,7 +33,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
3333
source.subscribe(new TakeSubscriber<T>(s, limit));
3434
}
3535

36-
static final class TakeSubscriber<T> extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {
36+
static final class TakeSubscriber<T> extends AtomicLong implements FlowableSubscriber<T>, Subscription {
3737

3838
private static final long serialVersionUID = -5636543848937116287L;
3939

@@ -69,7 +69,7 @@ public void onSubscribe(Subscription s) {
6969

7070
@Override
7171
public void onNext(T t) {
72-
if (!done && remaining-- > 0) {
72+
if (remaining-- > 0) {
7373
boolean stop = remaining == 0;
7474
downstream.onNext(t);
7575
if (stop) {
@@ -103,13 +103,29 @@ public void request(long n) {
103103
if (!SubscriptionHelper.validate(n)) {
104104
return;
105105
}
106-
if (!get() && compareAndSet(false, true)) {
107-
if (n >= limit) {
108-
upstream.request(Long.MAX_VALUE);
106+
while (true) {
107+
// get the total requests so far r
108+
long r = get();
109+
if (r == limit) {
109110
return;
110111
}
112+
// calculate the new total of requests r2 after overflow
113+
// and limit have been accounted for
114+
long r2 = r + n;
115+
final long more;
116+
if (r2 < 0) {
117+
// overflow
118+
r2 = limit;
119+
more = limit - r;
120+
} else {
121+
r2 = Math.min(limit, r2);
122+
more = r2 - r;
123+
}
124+
if (compareAndSet(r, r2)) {
125+
upstream.request(more);
126+
break;
127+
}
111128
}
112-
upstream.request(n);
113129
}
114130

115131
@Override

src/test/java/io/reactivex/rxjava3/flowable/FlowableSubscriberTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,7 @@ public void cancel() {
331331
}
332332
}).take(2).subscribe(ts);
333333

334-
// FIXME the take now requests Long.MAX_PATH if downstream requests at least the limit
335-
assertEquals(Long.MAX_VALUE, requested.get());
334+
assertEquals(2, requested.get());
336335
}
337336

338337
@Test

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java

-24
Original file line numberDiff line numberDiff line change
@@ -2047,30 +2047,6 @@ public void openCloseTake() {
20472047
ts.assertResult(Collections.<Integer>emptyList());
20482048
}
20492049

2050-
@Test
2051-
@SuppressWarnings("unchecked")
2052-
public void openCloseLimit() {
2053-
PublishProcessor<Integer> source = PublishProcessor.create();
2054-
2055-
PublishProcessor<Integer> openIndicator = PublishProcessor.create();
2056-
2057-
PublishProcessor<Integer> closeIndicator = PublishProcessor.create();
2058-
2059-
TestSubscriber<List<Integer>> ts = source
2060-
.buffer(openIndicator, Functions.justFunction(closeIndicator))
2061-
.limit(1)
2062-
.test(2);
2063-
2064-
openIndicator.onNext(1);
2065-
closeIndicator.onComplete();
2066-
2067-
assertFalse(source.hasSubscribers());
2068-
assertFalse(openIndicator.hasSubscribers());
2069-
assertFalse(closeIndicator.hasSubscribers());
2070-
2071-
ts.assertResult(Collections.<Integer>emptyList());
2072-
}
2073-
20742050
@Test
20752051
@SuppressWarnings("unchecked")
20762052
public void openCloseEmptyBackpressure() {

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithMaybeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public void drainExactRequestCancel() {
357357
final MaybeSubject<Integer> cs = MaybeSubject.create();
358358

359359
TestSubscriber<Integer> ts = pp.mergeWith(cs)
360-
.limit(2)
360+
.take(2)
361361
.subscribeWith(new TestSubscriber<Integer>(2) {
362362
@Override
363363
public void onNext(Integer t) {

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithSingleTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ public void drainExactRequestCancel() {
353353
final SingleSubject<Integer> cs = SingleSubject.create();
354354

355355
TestSubscriber<Integer> ts = pp.mergeWith(cs)
356-
.limit(2)
356+
.take(2)
357357
.subscribeWith(new TestSubscriber<Integer>(2) {
358358
@Override
359359
public void onNext(Integer t) {

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,27 @@ public void subscribe(Subscriber<? super String> subscriber) {
142142
RxJavaPlugins.reset();
143143
}
144144
}
145+
146+
@Test
147+
public void takeEmitsErrors() {
148+
Flowable.error(new TestException())
149+
.take(1)
150+
.test()
151+
.assertNoValues()
152+
.assertError(TestException.class);
153+
}
154+
155+
@Test
156+
public void takeRequestOverflow() {
157+
TestSubscriber<Integer> ts = Flowable.just(1, 2, 3)
158+
.take(3)
159+
.test(0);
160+
ts.requestMore(1)
161+
.assertValues(1)
162+
.assertNotComplete()
163+
.requestMore(Long.MAX_VALUE)
164+
.assertValues(1, 2, 3);
165+
}
145166

146167
@Test
147168
public void unsubscribeAfterTake() {
@@ -305,7 +326,7 @@ public void cancel() {
305326
}
306327

307328
}).take(3).subscribe(ts);
308-
assertEquals(Long.MAX_VALUE, requested.get());
329+
assertEquals(3, requested.get());
309330
}
310331

311332
@Test
@@ -332,7 +353,7 @@ public void cancel() {
332353

333354
}).take(1).subscribe(ts);
334355
//FIXME take triggers fast path if downstream requests more than the limit
335-
assertEquals(Long.MAX_VALUE, requested.get());
356+
assertEquals(1, requested.get());
336357
}
337358

338359
@Test
@@ -383,7 +404,7 @@ public void accept(long n) {
383404
ts.awaitDone(5, TimeUnit.SECONDS);
384405
ts.assertComplete();
385406
ts.assertNoErrors();
386-
assertEquals(3, requests.get());
407+
assertEquals(2, requests.get());
387408
}
388409

389410
@Test

0 commit comments

Comments
 (0)