Skip to content

Commit daeccff

Browse files
authored
3.x: Fix Observable.window (size & time) cancellation and abandonment (#6761)
1 parent df7f1cd commit daeccff

File tree

8 files changed

+774
-479
lines changed

8 files changed

+774
-479
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ public void onNext(T t) {
9292
long i = index;
9393

9494
UnicastProcessor<T> w = window;
95-
WindowSubscribeIntercept<T> intercept = null;
95+
FlowableWindowSubscribeIntercept<T> intercept = null;
9696
if (i == 0) {
9797
getAndIncrement();
9898

9999
w = UnicastProcessor.<T>create(bufferSize, this);
100100
window = w;
101101

102-
intercept = new WindowSubscribeIntercept<T>(w);
102+
intercept = new FlowableWindowSubscribeIntercept<T>(w);
103103
downstream.onNext(intercept);
104104
}
105105

@@ -211,15 +211,15 @@ public void onSubscribe(Subscription s) {
211211
public void onNext(T t) {
212212
long i = index;
213213

214-
WindowSubscribeIntercept<T> intercept = null;
214+
FlowableWindowSubscribeIntercept<T> intercept = null;
215215
UnicastProcessor<T> w = window;
216216
if (i == 0) {
217217
getAndIncrement();
218218

219219
w = UnicastProcessor.<T>create(bufferSize, this);
220220
window = w;
221221

222-
intercept = new WindowSubscribeIntercept<T>(w);
222+
intercept = new FlowableWindowSubscribeIntercept<T>(w);
223223
downstream.onNext(intercept);
224224
}
225225

@@ -477,7 +477,7 @@ void drain() {
477477
break;
478478
}
479479

480-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(t);
480+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(t);
481481
a.onNext(intercept);
482482

483483
if (intercept.tryAbandon()) {

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/WindowSubscribeIntercept.java renamed to src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowSubscribeIntercept.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
* @param <T> the element type of the flow.
2626
* @since 3.0.0
2727
*/
28-
final class WindowSubscribeIntercept<T> extends Flowable<T> {
28+
final class FlowableWindowSubscribeIntercept<T> extends Flowable<T> {
2929

3030
final FlowableProcessor<T> window;
3131

3232
final AtomicBoolean once;
3333

34-
WindowSubscribeIntercept(FlowableProcessor<T> source) {
34+
FlowableWindowSubscribeIntercept(FlowableProcessor<T> source) {
3535
this.window = source;
3636
this.once = new AtomicBoolean();
3737
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java

+12-15
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
2929
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
3030
import io.reactivex.rxjava3.processors.UnicastProcessor;
31-
import io.reactivex.rxjava3.subscribers.SerializedSubscriber;
3231

3332
public final class FlowableWindowTimed<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
3433
final long timespan;
@@ -53,23 +52,21 @@ public FlowableWindowTimed(Flowable<T> source,
5352
}
5453

5554
@Override
56-
protected void subscribeActual(Subscriber<? super Flowable<T>> s) {
57-
SerializedSubscriber<Flowable<T>> actual = new SerializedSubscriber<Flowable<T>>(s);
58-
55+
protected void subscribeActual(Subscriber<? super Flowable<T>> downstream) {
5956
if (timespan == timeskip) {
6057
if (maxSize == Long.MAX_VALUE) {
6158
source.subscribe(new WindowExactUnboundedSubscriber<T>(
62-
actual,
59+
downstream,
6360
timespan, unit, scheduler, bufferSize));
6461
return;
6562
}
6663
source.subscribe(new WindowExactBoundedSubscriber<T>(
67-
actual,
64+
downstream,
6865
timespan, unit, scheduler,
6966
bufferSize, maxSize, restartTimerOnMaxSize));
7067
return;
7168
}
72-
source.subscribe(new WindowSkipSubscriber<T>(actual,
69+
source.subscribe(new WindowSkipSubscriber<T>(downstream,
7370
timespan, timeskip, unit, scheduler.createWorker(), bufferSize));
7471
}
7572

@@ -100,8 +97,8 @@ abstract static class AbstractWindowSubscriber<T>
10097

10198
final AtomicInteger windowCount;
10299

103-
AbstractWindowSubscriber(Subscriber<? super Flowable<T>> actual, long timespan, TimeUnit unit, int bufferSize) {
104-
this.downstream = actual;
100+
AbstractWindowSubscriber(Subscriber<? super Flowable<T>> downstream, long timespan, TimeUnit unit, int bufferSize) {
101+
this.downstream = downstream;
105102
this.queue = new MpscLinkedQueue<Object>();
106103
this.timespan = timespan;
107104
this.unit = unit;
@@ -204,7 +201,7 @@ void createFirstWindow() {
204201

205202
emitted = 1;
206203

207-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(window);
204+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(window);
208205
downstream.onNext(intercept);
209206

210207
timer.replace(scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit));
@@ -293,7 +290,7 @@ else if (!isEmpty) {
293290
window = UnicastProcessor.create(bufferSize, windowRunnable);
294291
this.window = window;
295292

296-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(window);
293+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(window);
297294
downstream.onNext(intercept);
298295

299296
if (intercept.tryAbandon()) {
@@ -372,7 +369,7 @@ void createFirstWindow() {
372369
windowCount.getAndIncrement();
373370
window = UnicastProcessor.create(bufferSize, this);
374371

375-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(window);
372+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(window);
376373
downstream.onNext(intercept);
377374

378375
Runnable boundaryTask = new WindowBoundaryRunnable(this, 1L);
@@ -510,7 +507,7 @@ UnicastProcessor<T> createNewWindow(UnicastProcessor<T> window) {
510507
window = UnicastProcessor.create(bufferSize, this);
511508
this.window = window;
512509

513-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(window);
510+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(window);
514511
downstream.onNext(intercept);
515512

516513
if (restartTimerOnMaxSize) {
@@ -573,7 +570,7 @@ void createFirstWindow() {
573570
UnicastProcessor<T> window = UnicastProcessor.create(bufferSize, this);
574571
windows.add(window);
575572

576-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(window);
573+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(window);
577574
downstream.onNext(intercept);
578575

579576
worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit);
@@ -647,7 +644,7 @@ void drain() {
647644
UnicastProcessor<T> window = UnicastProcessor.create(bufferSize, this);
648645
windows.add(window);
649646

650-
WindowSubscribeIntercept<T> intercept = new WindowSubscribeIntercept<T>(window);
647+
FlowableWindowSubscribeIntercept<T> intercept = new FlowableWindowSubscribeIntercept<T>(window);
651648
downstream.onNext(intercept);
652649

653650
worker.schedule(new WindowBoundaryRunnable(this, false), timespan, unit);

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindow.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,17 @@ public void onSubscribe(Disposable d) {
7777
@Override
7878
public void onNext(T t) {
7979
UnicastSubject<T> w = window;
80+
ObservableWindowSubscribeIntercept<T> intercept = null;
8081
if (w == null && !cancelled) {
8182
w = UnicastSubject.create(capacityHint, this);
8283
window = w;
83-
downstream.onNext(w);
84+
intercept = new ObservableWindowSubscribeIntercept<T>(w);
85+
downstream.onNext(intercept);
8486
}
8587

8688
if (w != null) {
8789
w.onNext(t);
90+
8891
if (++size >= count) {
8992
size = 0;
9093
window = null;
@@ -93,6 +96,12 @@ public void onNext(T t) {
9396
upstream.dispose();
9497
}
9598
}
99+
100+
if (intercept != null && intercept.tryAbandon()) {
101+
w.onComplete();
102+
w = null;
103+
window = null;
104+
}
96105
}
97106
}
98107

@@ -180,11 +189,14 @@ public void onNext(T t) {
180189

181190
long s = skip;
182191

192+
ObservableWindowSubscribeIntercept<T> intercept = null;
193+
183194
if (i % s == 0 && !cancelled) {
184195
wip.getAndIncrement();
185196
UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
197+
intercept = new ObservableWindowSubscribeIntercept<T>(w);
186198
ws.offer(w);
187-
downstream.onNext(w);
199+
downstream.onNext(intercept);
188200
}
189201

190202
long c = firstEmission + 1;
@@ -205,6 +217,10 @@ public void onNext(T t) {
205217
}
206218

207219
index = i + 1;
220+
221+
if (intercept != null && intercept.tryAbandon()) {
222+
intercept.window.onComplete();
223+
}
208224
}
209225

210226
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.observable;
15+
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
18+
import io.reactivex.rxjava3.core.*;
19+
import io.reactivex.rxjava3.subjects.Subject;
20+
21+
/**
22+
* Wrapper for a Subject that detects an incoming subscriber.
23+
* @param <T> the element type of the flow.
24+
* @since 3.0.0
25+
*/
26+
final class ObservableWindowSubscribeIntercept<T> extends Observable<T> {
27+
28+
final Subject<T> window;
29+
30+
final AtomicBoolean once;
31+
32+
ObservableWindowSubscribeIntercept(Subject<T> source) {
33+
this.window = source;
34+
this.once = new AtomicBoolean();
35+
}
36+
37+
@Override
38+
protected void subscribeActual(Observer<? super T> s) {
39+
window.subscribe(s);
40+
once.set(true);
41+
}
42+
43+
boolean tryAbandon() {
44+
return !once.get() && once.compareAndSet(false, true);
45+
}
46+
}

0 commit comments

Comments
 (0)