Skip to content

Commit d1cbf57

Browse files
authored
3.x: Add blockingForEach(Consumer, int) overload (#6800)
1 parent 38bcd5e commit d1cbf57

File tree

2 files changed

+88
-6
lines changed

2 files changed

+88
-6
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+48-5
Original file line numberDiff line numberDiff line change
@@ -5663,8 +5663,8 @@ public final T blockingFirst(@NonNull T defaultItem) {
56635663
* sequence.
56645664
* <dl>
56655665
* <dt><b>Backpressure:</b></dt>
5666-
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
5667-
* (i.e., no backpressure applied to it).</dd>
5666+
* <dd>The operator requests {@link Flowable#bufferSize()} upfront, then 75% of this
5667+
* amount when 75% is received.</dd>
56685668
* <dt><b>Scheduler:</b></dt>
56695669
* <dd>{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.</dd>
56705670
* <dt><b>Error handling:</b></dt>
@@ -5676,14 +5676,57 @@ public final T blockingFirst(@NonNull T defaultItem) {
56765676
* @param onNext
56775677
* the {@link Consumer} to invoke for each item emitted by the {@code Flowable}
56785678
* @throws RuntimeException
5679-
* if an error occurs
5679+
* if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
5680+
* as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
56805681
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
56815682
* @see #subscribe(Consumer)
5683+
* @see #blockingForEach(Consumer, int)
56825684
*/
5683-
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
5685+
@BackpressureSupport(BackpressureKind.FULL)
56845686
@SchedulerSupport(SchedulerSupport.NONE)
56855687
public final void blockingForEach(@NonNull Consumer<? super T> onNext) {
5686-
Iterator<T> it = blockingIterable().iterator();
5688+
blockingForEach(onNext, bufferSize());
5689+
}
5690+
5691+
/**
5692+
* Consumes the upstream {@code Flowable} in a blocking fashion and invokes the given
5693+
* {@code Consumer} with each upstream item on the <em>current thread</em> until the
5694+
* upstream terminates.
5695+
* <p>
5696+
* <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt="">
5697+
* <p>
5698+
* <em>Note:</em> the method will only return if the upstream terminates or the current
5699+
* thread is interrupted.
5700+
* <p>
5701+
* This method executes the {@code Consumer} on the current thread while
5702+
* {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the
5703+
* sequence.
5704+
* <dl>
5705+
* <dt><b>Backpressure:</b></dt>
5706+
* <dd>The operator requests the given {@code prefetch} amount upfront, then 75% of this
5707+
* amount when 75% is received.</dd>
5708+
* <dt><b>Scheduler:</b></dt>
5709+
* <dd>{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.</dd>
5710+
* <dt><b>Error handling:</b></dt>
5711+
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
5712+
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
5713+
* {@link Error}s are rethrown as they are.</dd>
5714+
* </dl>
5715+
*
5716+
* @param onNext
5717+
* the {@link Consumer} to invoke for each item emitted by the {@code Flowable}
5718+
* @param bufferSize
5719+
* the number of items to prefetch upfront, then 75% of it after 75% received
5720+
* @throws RuntimeException
5721+
* if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
5722+
* as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
5723+
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
5724+
* @see #subscribe(Consumer)
5725+
*/
5726+
@BackpressureSupport(BackpressureKind.FULL)
5727+
@SchedulerSupport(SchedulerSupport.NONE)
5728+
public final void blockingForEach(@NonNull Consumer<? super T> onNext, int bufferSize) {
5729+
Iterator<T> it = blockingIterable(bufferSize).iterator();
56875730
while (it.hasNext()) {
56885731
try {
56895732
onNext.accept(it.next());

src/main/java/io/reactivex/rxjava3/core/Observable.java

+40-1
Original file line numberDiff line numberDiff line change
@@ -5170,11 +5170,50 @@ public final T blockingFirst(@NonNull T defaultItem) {
51705170
* if an error occurs
51715171
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
51725172
* @see #subscribe(Consumer)
5173+
* @see #blockingForEach(Consumer, int)
51735174
*/
51745175
@SchedulerSupport(SchedulerSupport.NONE)
51755176
@NonNull
51765177
public final void blockingForEach(@NonNull Consumer<? super T> onNext) {
5177-
Iterator<T> it = blockingIterable().iterator();
5178+
blockingForEach(onNext, bufferSize());
5179+
}
5180+
5181+
/**
5182+
* Consumes the upstream {@code Observable} in a blocking fashion and invokes the given
5183+
* {@code Consumer} with each upstream item on the <em>current thread</em> until the
5184+
* upstream terminates.
5185+
* <p>
5186+
* <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingForEach.o.png" alt="">
5187+
* <p>
5188+
* <em>Note:</em> the method will only return if the upstream terminates or the current
5189+
* thread is interrupted.
5190+
* <p>
5191+
* This method executes the {@code Consumer} on the current thread while
5192+
* {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the
5193+
* sequence.
5194+
* <dl>
5195+
* <dt><b>Scheduler:</b></dt>
5196+
* <dd>{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.</dd>
5197+
* <dt><b>Error handling:</b></dt>
5198+
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
5199+
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
5200+
* {@link Error}s are rethrown as they are.</dd>
5201+
* </dl>
5202+
*
5203+
* @param onNext
5204+
* the {@link Consumer} to invoke for each item emitted by the {@code Observable}
5205+
* @param capacityHint
5206+
* the number of items expected to be buffered (allows reducing buffer reallocations)
5207+
* @throws RuntimeException
5208+
* if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
5209+
* as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
5210+
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
5211+
* @see #subscribe(Consumer)
5212+
*/
5213+
@SchedulerSupport(SchedulerSupport.NONE)
5214+
@NonNull
5215+
public final void blockingForEach(@NonNull Consumer<? super T> onNext, int capacityHint) {
5216+
Iterator<T> it = blockingIterable(capacityHint).iterator();
51785217
while (it.hasNext()) {
51795218
try {
51805219
onNext.accept(it.next());

0 commit comments

Comments
 (0)