Skip to content

Commit 28e24dc

Browse files
authored
3.x: Fix refCount not resetting when termination triggers cross-cancel (#6609)
1 parent 1144ebc commit 28e24dc

File tree

4 files changed

+53
-14
lines changed

4 files changed

+53
-14
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,15 @@ void cancel(RefConnection rc) {
115115

116116
void terminated(RefConnection rc) {
117117
synchronized (this) {
118-
if (connection != null && connection == rc) {
119-
connection = null;
118+
if (connection == rc) {
120119
if (rc.timer != null) {
121120
rc.timer.dispose();
121+
rc.timer = null;
122+
}
123+
if (--rc.subscriberCount == 0) {
124+
connection = null;
125+
source.reset();
122126
}
123-
}
124-
if (--rc.subscriberCount == 0) {
125-
source.reset();
126127
}
127128
}
128129
}

src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,15 @@ void cancel(RefConnection rc) {
112112

113113
void terminated(RefConnection rc) {
114114
synchronized (this) {
115-
if (connection != null && connection == rc) {
116-
connection = null;
115+
if (connection == rc) {
117116
if (rc.timer != null) {
118117
rc.timer.dispose();
118+
rc.timer = null;
119+
}
120+
if (--rc.subscriberCount == 0) {
121+
connection = null;
122+
source.reset();
119123
}
120-
}
121-
if (--rc.subscriberCount == 0) {
122-
source.reset();
123124
}
124125
}
125126
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.any;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.io.IOException;
@@ -1454,4 +1455,22 @@ public void publishRefCountShallBeThreadSafe() {
14541455
.assertComplete();
14551456
}
14561457
}
1457-
}
1458+
1459+
@Test
1460+
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
1461+
ReplayProcessor<Integer> rp = ReplayProcessor.create();
1462+
rp.onNext(1);
1463+
rp.onComplete();
1464+
1465+
Flowable<Integer> shared = rp.share();
1466+
1467+
shared
1468+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1469+
.test()
1470+
.assertValueCount(2);
1471+
1472+
shared
1473+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1474+
.test()
1475+
.assertValueCount(2);
1476+
}}

src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.any;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.io.IOException;
@@ -22,7 +23,7 @@
2223
import java.util.concurrent.*;
2324
import java.util.concurrent.atomic.*;
2425

25-
import org.junit.Test;
26+
import org.junit.*;
2627
import org.mockito.InOrder;
2728

2829
import io.reactivex.*;
@@ -1267,8 +1268,6 @@ public void cancelTerminateStateExclusion() {
12671268
.publish()
12681269
.refCount();
12691270

1270-
o.cancel(null);
1271-
12721271
o.cancel(new RefConnection(o));
12731272

12741273
RefConnection rc = new RefConnection(o);
@@ -1412,4 +1411,23 @@ public void publishRefCountShallBeThreadSafe() {
14121411
.assertComplete();
14131412
}
14141413
}
1414+
1415+
@Test
1416+
public void upstreamTerminationTriggersAnotherCancel() throws Exception {
1417+
ReplaySubject<Integer> rs = ReplaySubject.create();
1418+
rs.onNext(1);
1419+
rs.onComplete();
1420+
1421+
Observable<Integer> shared = rs.share();
1422+
1423+
shared
1424+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1425+
.test()
1426+
.assertValueCount(2);
1427+
1428+
shared
1429+
.buffer(shared.debounce(5, TimeUnit.SECONDS))
1430+
.test()
1431+
.assertValueCount(2);
1432+
}
14151433
}

0 commit comments

Comments
 (0)