Skip to content

Commit 52346a1

Browse files
mgsholteakarnokd
authored andcommitted
2.x: Zip, CombineLatest, and Amb operators throw when supplied with ObservableSource implementation that doesn't subclass Observable (#6754)
* 2.x: Zip, CombineLatest, and Amb operators throw when supplied with ObservableSource implementation that doesn't subclass Observable #6753 * 2.x: add tests for allowing arbitrary ObservableSource implementations
1 parent 31b407f commit 52346a1

File tree

6 files changed

+78
-3
lines changed

6 files changed

+78
-3
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void subscribeActual(Observer<? super T> observer) {
3636
ObservableSource<? extends T>[] sources = this.sources;
3737
int count = 0;
3838
if (sources == null) {
39-
sources = new Observable[8];
39+
sources = new ObservableSource[8];
4040
try {
4141
for (ObservableSource<? extends T> p : sourcesIterable) {
4242
if (p == null) {

src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void subscribeActual(Observer<? super R> observer) {
4949
ObservableSource<? extends T>[] sources = this.sources;
5050
int count = 0;
5151
if (sources == null) {
52-
sources = new Observable[8];
52+
sources = new ObservableSource[8];
5353
for (ObservableSource<? extends T> p : sourcesIterable) {
5454
if (count == sources.length) {
5555
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];

src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void subscribeActual(Observer<? super R> observer) {
5050
ObservableSource<? extends T>[] sources = this.sources;
5151
int count = 0;
5252
if (sources == null) {
53-
sources = new Observable[8];
53+
sources = new ObservableSource[8];
5454
for (ObservableSource<? extends T> p : sourcesIterable) {
5555
if (count == sources.length) {
5656
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];

src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,23 @@ public void singleIterable() {
264264
.assertResult(1);
265265
}
266266

267+
/**
268+
* Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable
269+
*/
270+
@Test
271+
public void singleIterableNotSubclassingObservable() {
272+
final ObservableSource<Integer> s1 = new ObservableSource<Integer>() {
273+
@Override
274+
public void subscribe (final Observer<? super Integer> observer) {
275+
Observable.just(1).subscribe(observer);
276+
}
277+
};
278+
279+
Observable.amb(Collections.singletonList(s1))
280+
.test()
281+
.assertResult(1);
282+
}
283+
267284
@SuppressWarnings("unchecked")
268285
@Test
269286
public void disposed() {

src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,34 @@ public Object apply(Object[] a) throws Exception {
787787
.assertResult("[1, 2]");
788788
}
789789

790+
/**
791+
* Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable
792+
*/
793+
@Test
794+
public void combineLatestIterableOfSourcesNotSubclassingObservable() {
795+
final ObservableSource<Integer> s1 = new ObservableSource<Integer>() {
796+
@Override
797+
public void subscribe (final Observer<? super Integer> observer) {
798+
Observable.just(1).subscribe(observer);
799+
}
800+
};
801+
final ObservableSource<Integer> s2 = new ObservableSource<Integer>() {
802+
@Override
803+
public void subscribe (final Observer<? super Integer> observer) {
804+
Observable.just(2).subscribe(observer);
805+
}
806+
};
807+
808+
Observable.combineLatest(Arrays.asList(s1, s2), new Function<Object[], Object>() {
809+
@Override
810+
public Object apply(Object[] a) throws Exception {
811+
return Arrays.toString(a);
812+
}
813+
})
814+
.test()
815+
.assertResult("[1, 2]");
816+
}
817+
790818
@Test
791819
@SuppressWarnings("unchecked")
792820
public void combineLatestDelayErrorArrayOfSources() {
@@ -1216,4 +1244,5 @@ public Object apply(Object[] a) throws Exception {
12161244
.awaitDone(5, TimeUnit.SECONDS)
12171245
.assertFailure(TestException.class, 42);
12181246
}
1247+
12191248
}

src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,34 @@ public Object apply(Object[] a) throws Exception {
13501350
.assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]");
13511351
}
13521352

1353+
/**
1354+
* Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable
1355+
*/
1356+
@Test
1357+
public void zipIterableNotSubclassingObservable() {
1358+
final ObservableSource<Integer> s1 = new ObservableSource<Integer>() {
1359+
@Override
1360+
public void subscribe (final Observer<? super Integer> observer) {
1361+
Observable.just(1).subscribe(observer);
1362+
}
1363+
};
1364+
final ObservableSource<Integer> s2 = new ObservableSource<Integer>() {
1365+
@Override
1366+
public void subscribe (final Observer<? super Integer> observer) {
1367+
Observable.just(2).subscribe(observer);
1368+
}
1369+
};
1370+
1371+
Observable.zip(Arrays.asList(s1, s2), new Function<Object[], Object>() {
1372+
@Override
1373+
public Object apply(Object[] a) throws Exception {
1374+
return Arrays.toString(a);
1375+
}
1376+
})
1377+
.test()
1378+
.assertResult("[1, 2]");
1379+
}
1380+
13531381
@Test
13541382
public void dispose() {
13551383
TestHelper.checkDisposed(Observable.zip(Observable.just(1), Observable.just(1), new BiFunction<Integer, Integer, Object>() {
@@ -1457,4 +1485,5 @@ public Object apply(Object[] a) throws Exception {
14571485

14581486
assertEquals(0, counter.get());
14591487
}
1488+
14601489
}

0 commit comments

Comments
 (0)