Skip to content

Commit ee004ad

Browse files
authored
2.x: dedicated Single.zip implementation, no dispose on all-success (#5027)
1 parent d93ee2b commit ee004ad

File tree

12 files changed

+852
-34
lines changed

12 files changed

+852
-34
lines changed

src/main/java/io/reactivex/Single.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex;
1515

16+
import java.util.NoSuchElementException;
1617
import java.util.concurrent.*;
1718

1819
import org.reactivestreams.Publisher;
@@ -1035,8 +1036,9 @@ public static <T> Single<T> wrap(SingleSource<T> source) {
10351036
@CheckReturnValue
10361037
@SchedulerSupport(SchedulerSupport.NONE)
10371038
public static <T, R> Single<R> zip(final Iterable<? extends SingleSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
1039+
ObjectHelper.requireNonNull(zipper, "zipper is null");
10381040
ObjectHelper.requireNonNull(sources, "sources is null");
1039-
return toSingle(Flowable.zipIterable(SingleInternalHelper.iterableToFlowable(sources), zipper, false, 1));
1041+
return RxJavaPlugins.onAssembly(new SingleZipIterable<T, R>(sources, zipper));
10401042
}
10411043

10421044
/**
@@ -1475,17 +1477,13 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Single<R> zip(
14751477
*/
14761478
@CheckReturnValue
14771479
@SchedulerSupport(SchedulerSupport.NONE)
1478-
@SuppressWarnings({"rawtypes", "unchecked"})
14791480
public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) {
1481+
ObjectHelper.requireNonNull(zipper, "zipper is null");
14801482
ObjectHelper.requireNonNull(sources, "sources is null");
1481-
Publisher[] sourcePublishers = new Publisher[sources.length];
1482-
int i = 0;
1483-
for (SingleSource<? extends T> s : sources) {
1484-
ObjectHelper.requireNonNull(s, "The " + i + "th source is null");
1485-
sourcePublishers[i] = RxJavaPlugins.onAssembly(new SingleToFlowable<T>(s));
1486-
i++;
1483+
if (sources.length == 0) {
1484+
return error(new NoSuchElementException());
14871485
}
1488-
return toSingle(Flowable.zipArray(zipper, false, 1, sourcePublishers));
1486+
return RxJavaPlugins.onAssembly(new SingleZipArray<T, R>(sources, zipper));
14891487
}
14901488

14911489
/**

src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ public R apply(T t) throws Exception {
5959
return;
6060
}
6161

62-
sources[i].subscribe(parent.observers[i]);
62+
MaybeSource<? extends T> source = sources[i];
63+
64+
if (source == null) {
65+
parent.innerError(new NullPointerException("One of the sources is null"), i);
66+
return;
67+
}
68+
source.subscribe(parent.observers[i]);
6369
}
6470
}
6571

src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ protected void subscribeActual(MaybeObserver<? super R> observer) {
4040

4141
try {
4242
for (MaybeSource<? extends T> source : sources) {
43+
if (source == null) {
44+
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
45+
return;
46+
}
4347
if (n == a.length) {
4448
a = Arrays.copyOf(a, n + (n >> 2));
4549
}

src/main/java/io/reactivex/internal/operators/single/SingleMap.java

+34-22
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,42 @@ public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends
3030

3131
@Override
3232
protected void subscribeActual(final SingleObserver<? super R> t) {
33-
source.subscribe(new SingleObserver<T>() {
34-
@Override
35-
public void onSubscribe(Disposable d) {
36-
t.onSubscribe(d);
37-
}
33+
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
34+
}
3835

39-
@Override
40-
public void onSuccess(T value) {
41-
R v;
42-
try {
43-
v = mapper.apply(value);
44-
} catch (Throwable e) {
45-
Exceptions.throwIfFatal(e);
46-
onError(e);
47-
return;
48-
}
49-
50-
t.onSuccess(v);
51-
}
36+
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
37+
38+
final SingleObserver<? super R> t;
39+
40+
final Function<? super T, ? extends R> mapper;
5241

53-
@Override
54-
public void onError(Throwable e) {
55-
t.onError(e);
42+
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
43+
this.t = t;
44+
this.mapper = mapper;
45+
}
46+
47+
@Override
48+
public void onSubscribe(Disposable d) {
49+
t.onSubscribe(d);
50+
}
51+
52+
@Override
53+
public void onSuccess(T value) {
54+
R v;
55+
try {
56+
v = mapper.apply(value);
57+
} catch (Throwable e) {
58+
Exceptions.throwIfFatal(e);
59+
onError(e);
60+
return;
5661
}
57-
});
62+
63+
t.onSuccess(v);
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
t.onError(e);
69+
}
5870
}
5971
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.Function;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
public final class SingleZipArray<T, R> extends Single<R> {
27+
28+
final SingleSource<? extends T>[] sources;
29+
30+
final Function<? super Object[], ? extends R> zipper;
31+
32+
public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
33+
this.sources = sources;
34+
this.zipper = zipper;
35+
}
36+
37+
@Override
38+
protected void subscribeActual(SingleObserver<? super R> observer) {
39+
SingleSource<? extends T>[] sources = this.sources;
40+
int n = sources.length;
41+
42+
43+
if (n == 1) {
44+
sources[0].subscribe(new SingleMap.MapSingleObserver<T, R>(observer, new Function<T, R>() {
45+
@Override
46+
public R apply(T t) throws Exception {
47+
return zipper.apply(new Object[] { t });
48+
}
49+
}));
50+
return;
51+
}
52+
53+
ZipCoordinator<T, R> parent = new ZipCoordinator<T, R>(observer, n, zipper);
54+
55+
observer.onSubscribe(parent);
56+
57+
for (int i = 0; i < n; i++) {
58+
if (parent.isDisposed()) {
59+
return;
60+
}
61+
62+
SingleSource<? extends T> source = sources[i];
63+
64+
if (source == null) {
65+
parent.innerError(new NullPointerException("One of the sources is null"), i);
66+
return;
67+
}
68+
69+
source.subscribe(parent.observers[i]);
70+
}
71+
}
72+
73+
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
74+
75+
76+
private static final long serialVersionUID = -5556924161382950569L;
77+
78+
final SingleObserver<? super R> actual;
79+
80+
final Function<? super Object[], ? extends R> zipper;
81+
82+
final ZipSingleObserver<T>[] observers;
83+
84+
final Object[] values;
85+
86+
@SuppressWarnings("unchecked")
87+
ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
88+
super(n);
89+
this.actual = observer;
90+
this.zipper = zipper;
91+
ZipSingleObserver<T>[] o = new ZipSingleObserver[n];
92+
for (int i = 0; i < n; i++) {
93+
o[i] = new ZipSingleObserver<T>(this, i);
94+
}
95+
this.observers = o;
96+
this.values = new Object[n];
97+
}
98+
99+
@Override
100+
public boolean isDisposed() {
101+
return get() <= 0;
102+
}
103+
104+
@Override
105+
public void dispose() {
106+
if (getAndSet(0) > 0) {
107+
for (ZipSingleObserver<?> d : observers) {
108+
d.dispose();
109+
}
110+
}
111+
}
112+
113+
void innerSuccess(T value, int index) {
114+
values[index] = value;
115+
if (decrementAndGet() == 0) {
116+
R v;
117+
118+
try {
119+
v = ObjectHelper.requireNonNull(zipper.apply(values), "The zipper returned a null value");
120+
} catch (Throwable ex) {
121+
Exceptions.throwIfFatal(ex);
122+
actual.onError(ex);
123+
return;
124+
}
125+
126+
actual.onSuccess(v);
127+
}
128+
}
129+
130+
void disposeExcept(int index) {
131+
ZipSingleObserver<T>[] observers = this.observers;
132+
int n = observers.length;
133+
for (int i = 0; i < index; i++) {
134+
observers[i].dispose();
135+
}
136+
for (int i = index + 1; i < n; i++) {
137+
observers[i].dispose();
138+
}
139+
}
140+
141+
void innerError(Throwable ex, int index) {
142+
if (getAndSet(0) > 0) {
143+
disposeExcept(index);
144+
actual.onError(ex);
145+
} else {
146+
RxJavaPlugins.onError(ex);
147+
}
148+
}
149+
}
150+
151+
static final class ZipSingleObserver<T>
152+
extends AtomicReference<Disposable>
153+
implements SingleObserver<T> {
154+
155+
private static final long serialVersionUID = 3323743579927613702L;
156+
157+
final ZipCoordinator<T, ?> parent;
158+
159+
final int index;
160+
161+
ZipSingleObserver(ZipCoordinator<T, ?> parent, int index) {
162+
this.parent = parent;
163+
this.index = index;
164+
}
165+
166+
public void dispose() {
167+
DisposableHelper.dispose(this);
168+
}
169+
170+
@Override
171+
public void onSubscribe(Disposable d) {
172+
DisposableHelper.setOnce(this, d);
173+
}
174+
175+
@Override
176+
public void onSuccess(T value) {
177+
parent.innerSuccess(value, index);
178+
}
179+
180+
@Override
181+
public void onError(Throwable e) {
182+
parent.innerError(e, index);
183+
}
184+
}
185+
}

0 commit comments

Comments
 (0)