Skip to content

Commit 4b612f6

Browse files
committed
Add QueueOverflowException, fix tests
1 parent cc04ccf commit 4b612f6

34 files changed

+103
-71
lines changed

src/main/java/io/reactivex/rxjava3/exceptions/MissingBackpressureException.java

-20
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,6 @@ public final class MissingBackpressureException extends RuntimeException {
2929
*/
3030
public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";
3131

32-
/**
33-
* The message for queue overflows.
34-
* <p>
35-
* This can happen if the upstream disregards backpressure completely or calls
36-
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
37-
* without synchronization. Rarely, it is an indication of bugs inside RxJava
38-
* @since 3.1.6
39-
*/
40-
public static final String QUEUE_OVERFLOW_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in RxJava";
41-
4232
/**
4333
* Constructs a MissingBackpressureException without message or cause.
4434
*/
@@ -63,14 +53,4 @@ public MissingBackpressureException(String message) {
6353
public static MissingBackpressureException createDefault() {
6454
return new MissingBackpressureException(DEFAULT_MESSAGE);
6555
}
66-
67-
/**
68-
* Constructs a new {@code MissingBackpressureException} with the
69-
* default message {@value #QUEUE_OVERFLOW_MESSAGE}.
70-
* @return the new {@code MissingBackpressureException} instance.
71-
* @since 3.1.6
72-
*/
73-
public static MissingBackpressureException createQueueOverflow() {
74-
return new MissingBackpressureException(QUEUE_OVERFLOW_MESSAGE);
75-
}
7656
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.exceptions;
15+
16+
/**
17+
* Indicates an overflow happened because the upstream disregarded backpressure completely or
18+
* {@link org.reactivestreams.Subscriber#onNext(Object)} was called concurrently from multiple threads
19+
* without synchronization. Rarely, it is an indication of bugs inside an operator.
20+
* @since 3.1.6
21+
*/
22+
public final class QueueOverflowException extends RuntimeException {
23+
24+
private static final long serialVersionUID = 8517344746016032542L;
25+
26+
/**
27+
* The message for queue overflows.
28+
* <p>
29+
* This can happen if the upstream disregards backpressure completely or calls
30+
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
31+
* without synchronization. Rarely, it is an indication of bugs inside an operator.
32+
*/
33+
private static final String DEFAULT_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in an operator";
34+
35+
/**
36+
* Constructs a QueueOverflowException with the default message.
37+
*/
38+
public QueueOverflowException() {
39+
this(DEFAULT_MESSAGE);
40+
}
41+
42+
/**
43+
* Constructs a QueueOverflowException with the given message but no cause.
44+
* @param message the error message
45+
*/
46+
public QueueOverflowException(String message) {
47+
super(message);
48+
}
49+
}

src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public void onNext(T t) {
174174
if (sourceMode != QueueFuseable.ASYNC) {
175175
if (!queue.offer(t)) {
176176
upstream.cancel();
177-
onError(MissingBackpressureException.createQueueOverflow());
177+
onError(new QueueOverflowException());
178178
return;
179179
}
180180
}

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
120120
public void onNext(CompletableSource t) {
121121
if (sourceFused == QueueSubscription.NONE) {
122122
if (!queue.offer(t)) {
123-
onError(MissingBackpressureException.createQueueOverflow());
123+
onError(new QueueOverflowException());
124124
return;
125125
}
126126
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void onNext(T t) {
140140
if (!queue.offer(t)) {
141141
SubscriptionHelper.cancel(this);
142142

143-
onError(MissingBackpressureException.createQueueOverflow());
143+
onError(new QueueOverflowException());
144144
} else {
145145
signalConsumer();
146146
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public final void onNext(T t) {
152152
if (sourceMode != QueueSubscription.ASYNC) {
153153
if (!queue.offer(t)) {
154154
upstream.cancel();
155-
onError(MissingBackpressureException.createQueueOverflow());
155+
onError(new QueueOverflowException());
156156
return;
157157
}
158158
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public final void onNext(T t) {
151151
if (sourceMode != QueueSubscription.ASYNC) {
152152
if (!queue.offer(t)) {
153153
upstream.cancel();
154-
onError(MissingBackpressureException.createQueueOverflow());
154+
onError(new QueueOverflowException());
155155
return;
156156
}
157157
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
243243
q = getMainQueue();
244244
}
245245
if (!q.offer(value)) {
246-
onError(MissingBackpressureException.createQueueOverflow());
246+
onError(new QueueOverflowException());
247247
}
248248
}
249249
if (decrementAndGet() == 0) {
@@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
252252
} else {
253253
SimpleQueue<U> q = getMainQueue();
254254
if (!q.offer(value)) {
255-
onError(MissingBackpressureException.createQueueOverflow());
255+
onError(new QueueOverflowException());
256256
return;
257257
}
258258
if (getAndIncrement() != 0) {
@@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
278278
inner.queue = q;
279279
}
280280
if (!q.offer(value)) {
281-
onError(MissingBackpressureException.createQueueOverflow());
281+
onError(new QueueOverflowException());
282282
}
283283
}
284284
if (decrementAndGet() == 0) {
@@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
291291
inner.queue = q;
292292
}
293293
if (!q.offer(value)) {
294-
onError(MissingBackpressureException.createQueueOverflow());
294+
onError(new QueueOverflowException());
295295
return;
296296
}
297297
if (getAndIncrement() != 0) {

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void onNext(T t) {
180180
return;
181181
}
182182
if (fusionMode == NONE && !queue.offer(t)) {
183-
onError(MissingBackpressureException.createQueueOverflow());
183+
onError(new QueueOverflowException());
184184
return;
185185
}
186186
drain();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public final void onNext(T t) {
113113
if (!queue.offer(t)) {
114114
upstream.cancel();
115115

116-
error = MissingBackpressureException.createQueueOverflow();
116+
error = new QueueOverflowException();
117117
done = true;
118118
}
119119
trySchedule();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
226226
public void onNext(T t) {
227227
// we expect upstream to honor backpressure requests
228228
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
229-
onError(MissingBackpressureException.createQueueOverflow());
229+
onError(new QueueOverflowException());
230230
return;
231231
}
232232
// since many things can happen concurrently, we have a common dispatch

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ public void onNext(R t) {
381381
SwitchMapSubscriber<T, R> p = parent;
382382
if (index == p.unique) {
383383
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
384-
onError(MissingBackpressureException.createQueueOverflow());
384+
onError(new QueueOverflowException());
385385
return;
386386
}
387387
p.drain();

src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.reactivestreams.Subscription;
1919

2020
import io.reactivex.rxjava3.core.FlowableSubscriber;
21-
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
21+
import io.reactivex.rxjava3.exceptions.*;
2222
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
2323
import io.reactivex.rxjava3.internal.util.*;
2424
import io.reactivex.rxjava3.operators.QueueFuseable;
@@ -99,7 +99,7 @@ public final void onNext(T t) {
9999
if (t != null) {
100100
if (!queue.offer(t)) {
101101
upstream.cancel();
102-
onError(MissingBackpressureException.createQueueOverflow());
102+
onError(new QueueOverflowException());
103103
return;
104104
}
105105
}

src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void onNext(T t) {
204204
if (sourceMode == QueueSubscription.NONE) {
205205
if (!queue.offer(t)) {
206206
upstream.cancel();
207-
onError(MissingBackpressureException.createQueueOverflow());
207+
onError(new QueueOverflowException());
208208
return;
209209
}
210210
}

src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.reactivestreams.*;
1919

2020
import io.reactivex.rxjava3.core.*;
21-
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
21+
import io.reactivex.rxjava3.exceptions.*;
2222
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
2323
import io.reactivex.rxjava3.internal.util.*;
2424
import io.reactivex.rxjava3.operators.SimplePlainQueue;
@@ -153,7 +153,7 @@ public void onNext(JoinInnerSubscriber<T> inner, T value) {
153153

154154
if (!q.offer(value)) {
155155
cancelAll();
156-
Throwable mbe = MissingBackpressureException.createQueueOverflow();
156+
Throwable mbe = new QueueOverflowException();
157157
if (errors.compareAndSet(null, mbe)) {
158158
downstream.onError(mbe);
159159
} else {
@@ -170,7 +170,7 @@ public void onNext(JoinInnerSubscriber<T> inner, T value) {
170170

171171
if (!q.offer(value)) {
172172
cancelAll();
173-
onError(MissingBackpressureException.createQueueOverflow());
173+
onError(new QueueOverflowException());
174174
return;
175175
}
176176

@@ -333,7 +333,7 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {
333333

334334
if (!q.offer(value)) {
335335
inner.cancel();
336-
errors.tryAddThrowableOrReport(MissingBackpressureException.createQueueOverflow());
336+
errors.tryAddThrowableOrReport(new QueueOverflowException());
337337
done.decrementAndGet();
338338
drainLoop();
339339
return;
@@ -347,7 +347,7 @@ void onNext(JoinInnerSubscriber<T> inner, T value) {
347347

348348
if (!q.offer(value)) {
349349
inner.cancel();
350-
errors.tryAddThrowableOrReport(MissingBackpressureException.createQueueOverflow());
350+
errors.tryAddThrowableOrReport(new QueueOverflowException());
351351
done.decrementAndGet();
352352
}
353353

src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import io.reactivex.rxjava3.core.*;
2121
import io.reactivex.rxjava3.core.Scheduler.Worker;
22-
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
22+
import io.reactivex.rxjava3.exceptions.*;
2323
import io.reactivex.rxjava3.internal.schedulers.SchedulerMultiWorkerSupport;
2424
import io.reactivex.rxjava3.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback;
2525
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
@@ -148,7 +148,7 @@ public final void onNext(T t) {
148148
}
149149
if (!queue.offer(t)) {
150150
upstream.cancel();
151-
onError(MissingBackpressureException.createQueueOverflow());
151+
onError(new QueueOverflowException());
152152
return;
153153
}
154154
schedule();

src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,13 @@ static boolean isBug(Throwable error) {
402402
return true;
403403
}
404404
// the sender didn't honor the request amount
405-
// it's either due to an operator bug or concurrent onNext
406405
if (error instanceof MissingBackpressureException) {
407406
return true;
408407
}
408+
// it's either due to an operator bug or concurrent onNext
409+
if (error instanceof QueueOverflowException) {
410+
return true;
411+
}
409412
// general protocol violations
410413
// it's either due to an operator bug or concurrent onNext
411414
if (error instanceof IllegalStateException) {

src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.reactivestreams.*;
2525

2626
import io.reactivex.rxjava3.core.*;
27-
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
27+
import io.reactivex.rxjava3.exceptions.QueueOverflowException;
2828
import io.reactivex.rxjava3.functions.*;
2929
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
3030
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
@@ -475,7 +475,7 @@ public Integer apply(Integer v) {
475475
int vc = ts.values().size();
476476
assertTrue("10 < " + vc, vc <= 10);
477477

478-
ts.assertError(MissingBackpressureException.class);
478+
ts.assertError(QueueOverflowException.class);
479479
}
480480

481481
@Test

src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
274274
}
275275
.flatMapStream(v -> Stream.of(1, 2), 1)
276276
.test(0)
277-
.assertFailure(MissingBackpressureException.class);
277+
.assertFailure(QueueOverflowException.class);
278278

279279
TestHelper.assertUndeliverable(errors, 0, TestException.class);
280280
});

src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void subscribe(Subscriber<? super Completable> s) {
5252
}), 1
5353
)
5454
.test()
55-
.assertFailure(MissingBackpressureException.class);
55+
.assertFailure(QueueOverflowException.class);
5656

5757
TestHelper.assertError(errors, 0, MissingBackpressureException.class);
5858
} finally {

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void emptyThrowsNoSuch() {
152152
it.next();
153153
}
154154

155-
@Test(expected = MissingBackpressureException.class)
155+
@Test(expected = QueueOverflowException.class)
156156
public void overflowQueue() {
157157
Iterator<Integer> it = new Flowable<Integer>() {
158158
@Override

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
624624
}
625625
.concatMap(Functions.justFunction(Flowable.just(2)), 8, ImmediateThinScheduler.INSTANCE)
626626
.test(0L)
627-
.assertFailure(MissingBackpressureException.class);
627+
.assertFailure(QueueOverflowException.class);
628628
}
629629

630630
@Test

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,7 @@ protected void subscribeActual(Subscriber<? super Integer> s) {
12911291
}
12921292
.concatMap(Functions.justFunction(Flowable.just(2)), 8)
12931293
.test(0L)
1294-
.assertFailure(MissingBackpressureException.class);
1294+
.assertFailure(QueueOverflowException.class);
12951295
}
12961296

12971297
@Test

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
13911391
}
13921392
.flatMap(v -> Flowable.just(v), 1)
13931393
.test(0L)
1394-
.assertFailure(MissingBackpressureException.class);
1394+
.assertFailure(QueueOverflowException.class);
13951395
}
13961396

13971397
@Test
@@ -1413,7 +1413,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
14131413
}
14141414
})
14151415
.test()
1416-
.assertFailure(MissingBackpressureException.class, 1);
1416+
.assertFailure(QueueOverflowException.class, 1);
14171417
}
14181418

14191419
@Test
@@ -1430,7 +1430,7 @@ protected void subscribeActual(@NonNull Subscriber<@NonNull ? super @NonNull Int
14301430
}
14311431
}, false, 1, 1)
14321432
.test(0L)
1433-
.assertFailure(MissingBackpressureException.class);
1433+
.assertFailure(QueueOverflowException.class);
14341434
}
14351435

14361436
@Test

0 commit comments

Comments
 (0)