Skip to content

Commit f8841c7

Browse files
davidmotenakarnokd
authored andcommitted
elementAt, first - constrain upstream requests (#6620)
* constrain upstream requests by FlowableElementAtMaybe * constrain upstream requests by FlowableElementAtSingle * update first(default) and firstElement BackpressureSupport type to reflect upstream request constraints * set BackpressureSupport for Flowable.firstOrError to FULL * update javadoc on backpressure for elementAt and first
1 parent a644739 commit f8841c7

File tree

4 files changed

+47
-20
lines changed

4 files changed

+47
-20
lines changed

src/main/java/io/reactivex/Flowable.java

+12-18
Original file line numberDiff line numberDiff line change
@@ -9494,8 +9494,7 @@ public final Flowable<T> doOnTerminate(final Action onTerminate) {
94949494
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAt.png" alt="">
94959495
* <dl>
94969496
* <dt><b>Backpressure:</b></dt>
9497-
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
9498-
* (i.e., no backpressure applied to it).</dd>
9497+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
94999498
* <dt><b>Scheduler:</b></dt>
95009499
* <dd>{@code elementAt} does not operate by default on a particular {@link Scheduler}.</dd>
95019500
* </dl>
@@ -9507,7 +9506,7 @@ public final Flowable<T> doOnTerminate(final Action onTerminate) {
95079506
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
95089507
*/
95099508
@CheckReturnValue
9510-
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
9509+
@BackpressureSupport(BackpressureKind.FULL)
95119510
@SchedulerSupport(SchedulerSupport.NONE)
95129511
public final Maybe<T> elementAt(long index) {
95139512
if (index < 0) {
@@ -9523,8 +9522,7 @@ public final Maybe<T> elementAt(long index) {
95239522
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
95249523
* <dl>
95259524
* <dt><b>Backpressure:</b></dt>
9526-
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
9527-
* (i.e., no backpressure applied to it).</dd>
9525+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
95289526
* <dt><b>Scheduler:</b></dt>
95299527
* <dd>{@code elementAt} does not operate by default on a particular {@link Scheduler}.</dd>
95309528
* </dl>
@@ -9541,7 +9539,7 @@ public final Maybe<T> elementAt(long index) {
95419539
*/
95429540
@CheckReturnValue
95439541
@NonNull
9544-
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
9542+
@BackpressureSupport(BackpressureKind.FULL)
95459543
@SchedulerSupport(SchedulerSupport.NONE)
95469544
public final Single<T> elementAt(long index, T defaultItem) {
95479545
if (index < 0) {
@@ -9558,8 +9556,7 @@ public final Single<T> elementAt(long index, T defaultItem) {
95589556
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
95599557
* <dl>
95609558
* <dt><b>Backpressure:</b></dt>
9561-
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
9562-
* (i.e., no backpressure applied to it).</dd>
9559+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
95639560
* <dt><b>Scheduler:</b></dt>
95649561
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
95659562
* </dl>
@@ -9573,7 +9570,7 @@ public final Single<T> elementAt(long index, T defaultItem) {
95739570
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
95749571
*/
95759572
@CheckReturnValue
9576-
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
9573+
@BackpressureSupport(BackpressureKind.FULL)
95779574
@SchedulerSupport(SchedulerSupport.NONE)
95789575
public final Single<T> elementAtOrError(long index) {
95799576
if (index < 0) {
@@ -9617,8 +9614,7 @@ public final Flowable<T> filter(Predicate<? super T> predicate) {
96179614
* <img width="640" height="237" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstElement.m.png" alt="">
96189615
* <dl>
96199616
* <dt><b>Backpressure:</b></dt>
9620-
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
9621-
* unbounded manner (i.e., without applying backpressure).</dd>
9617+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
96229618
* <dt><b>Scheduler:</b></dt>
96239619
* <dd>{@code firstElement} does not operate by default on a particular {@link Scheduler}.</dd>
96249620
* </dl>
@@ -9627,7 +9623,7 @@ public final Flowable<T> filter(Predicate<? super T> predicate) {
96279623
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
96289624
*/
96299625
@CheckReturnValue
9630-
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
9626+
@BackpressureSupport(BackpressureKind.FULL)
96319627
@SchedulerSupport(SchedulerSupport.NONE)
96329628
public final Maybe<T> firstElement() {
96339629
return elementAt(0);
@@ -9640,8 +9636,7 @@ public final Maybe<T> firstElement() {
96409636
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/first.s.png" alt="">
96419637
* <dl>
96429638
* <dt><b>Backpressure:</b></dt>
9643-
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
9644-
* unbounded manner (i.e., without applying backpressure).</dd>
9639+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
96459640
* <dt><b>Scheduler:</b></dt>
96469641
* <dd>{@code first} does not operate by default on a particular {@link Scheduler}.</dd>
96479642
* </dl>
@@ -9653,7 +9648,7 @@ public final Maybe<T> firstElement() {
96539648
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
96549649
*/
96559650
@CheckReturnValue
9656-
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
9651+
@BackpressureSupport(BackpressureKind.FULL)
96579652
@SchedulerSupport(SchedulerSupport.NONE)
96589653
public final Single<T> first(T defaultItem) {
96599654
return elementAt(0, defaultItem);
@@ -9666,8 +9661,7 @@ public final Single<T> first(T defaultItem) {
96669661
* <img width="640" height="237" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
96679662
* <dl>
96689663
* <dt><b>Backpressure:</b></dt>
9669-
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
9670-
* unbounded manner (i.e., without applying backpressure).</dd>
9664+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
96719665
* <dt><b>Scheduler:</b></dt>
96729666
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
96739667
* </dl>
@@ -9676,7 +9670,7 @@ public final Single<T> first(T defaultItem) {
96769670
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
96779671
*/
96789672
@CheckReturnValue
9679-
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
9673+
@BackpressureSupport(BackpressureKind.FULL) // take may trigger UNBOUNDED_IN
96809674
@SchedulerSupport(SchedulerSupport.NONE)
96819675
public final Single<T> firstOrError() {
96829676
return elementAtOrError(0);

src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void onSubscribe(Subscription s) {
6363
if (SubscriptionHelper.validate(this.upstream, s)) {
6464
this.upstream = s;
6565
downstream.onSubscribe(this);
66-
s.request(Long.MAX_VALUE);
66+
s.request(index + 1);
6767
}
6868
}
6969

src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void onSubscribe(Subscription s) {
7070
if (SubscriptionHelper.validate(this.upstream, s)) {
7171
this.upstream = s;
7272
downstream.onSubscribe(this);
73-
s.request(Long.MAX_VALUE);
73+
s.request(index + 1);
7474
}
7575
}
7676

src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivex.disposables.Disposables;
2727
import io.reactivex.exceptions.TestException;
2828
import io.reactivex.functions.Function;
29+
import io.reactivex.functions.LongConsumer;
2930
import io.reactivex.internal.subscriptions.BooleanSubscription;
3031
import io.reactivex.plugins.RxJavaPlugins;
3132
import io.reactivex.processors.PublishProcessor;
@@ -69,6 +70,38 @@ public void elementAt() {
6970
assertEquals(2, Flowable.fromArray(1, 2).elementAt(1).blockingGet()
7071
.intValue());
7172
}
73+
74+
@Test
75+
public void elementAtConstrainsUpstreamRequests() {
76+
final List<Long> requests = new ArrayList<Long>();
77+
Flowable.fromArray(1, 2, 3, 4)
78+
.doOnRequest(new LongConsumer() {
79+
@Override
80+
public void accept(long n) throws Throwable {
81+
requests.add(n);
82+
}
83+
})
84+
.elementAt(2)
85+
.blockingGet()
86+
.intValue();
87+
assertEquals(Arrays.asList(3L), requests);
88+
}
89+
90+
@Test
91+
public void elementAtWithDefaultConstrainsUpstreamRequests() {
92+
final List<Long> requests = new ArrayList<Long>();
93+
Flowable.fromArray(1, 2, 3, 4)
94+
.doOnRequest(new LongConsumer() {
95+
@Override
96+
public void accept(long n) throws Throwable {
97+
requests.add(n);
98+
}
99+
})
100+
.elementAt(2, 100)
101+
.blockingGet()
102+
.intValue();
103+
assertEquals(Arrays.asList(3L), requests);
104+
}
72105

73106
@Test(expected = IndexOutOfBoundsException.class)
74107
public void elementAtWithMinusIndex() {

0 commit comments

Comments
 (0)