diff --git a/src/main/java/io/reactivex/processors/MulticastProcessor.java b/src/main/java/io/reactivex/processors/MulticastProcessor.java index 317244313f..31b7a64fea 100644 --- a/src/main/java/io/reactivex/processors/MulticastProcessor.java +++ b/src/main/java/io/reactivex/processors/MulticastProcessor.java @@ -569,6 +569,7 @@ void drain() { } } + consumed = c; missed = wip.addAndGet(-missed); if (missed == 0) { break; diff --git a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java index 63a7b1a60b..d541dffb3e 100644 --- a/src/test/java/io/reactivex/processors/MulticastProcessorTest.java +++ b/src/test/java/io/reactivex/processors/MulticastProcessorTest.java @@ -783,4 +783,41 @@ public void noUpstream() { assertTrue(mp.hasSubscribers()); } + @Test + public void requestUpstreamPrefetchNonFused() { + for (int j = 1; j < 12; j++) { + MulticastProcessor mp = MulticastProcessor.create(j, true); + + TestSubscriber ts = mp.test(0).withTag("Prefetch: " + j); + + Flowable.range(1, 10).hide().subscribe(mp); + + ts.assertEmpty() + .requestMore(3) + .assertValuesOnly(1, 2, 3) + .requestMore(3) + .assertValuesOnly(1, 2, 3, 4, 5, 6) + .requestMore(4) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + } + + @Test + public void requestUpstreamPrefetchNonFused2() { + for (int j = 1; j < 12; j++) { + MulticastProcessor mp = MulticastProcessor.create(j, true); + + TestSubscriber ts = mp.test(0).withTag("Prefetch: " + j); + + Flowable.range(1, 10).hide().subscribe(mp); + + ts.assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(6) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + } }