diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 3f2b898aa2..683cbccf70 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1798,6 +1798,32 @@ public final Single doAfterSuccess(Consumer onAfterSuccess) { return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess(this, onAfterSuccess)); } + /** + * Registers an {@link Action} to be called after this Single invokes either onSuccess or onError. + * *

Note that the {@code doAfterSuccess} action is shared between subscriptions and as such + * should be thread-safe.

+ *

+ * + *

+ *
Scheduler:
+ *
{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onAfterTerminate + * an {@link Action} to be invoked when the source Single finishes + * @return a Single that emits the same items as the source Single, then invokes the + * {@link Action} + * @see ReactiveX operators documentation: Do + * @since 2.0.6 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Single doAfterTerminate(Action onAfterTerminate) { + ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); + return RxJavaPlugins.onAssembly(new SingleDoAfterTerminate(this, onAfterTerminate)); + } + /** * Calls the specified action after this Single signals onSuccess or onError or gets disposed by * the downstream. diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoAfterTerminate.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoAfterTerminate.java new file mode 100644 index 0000000000..b4b7d58f89 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoAfterTerminate.java @@ -0,0 +1,101 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.Single; +import io.reactivex.SingleObserver; +import io.reactivex.SingleSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Calls an action after pushing the current item or an error to the downstream. + * @param the value type + * @since 2.0.6 - experimental + */ +public final class SingleDoAfterTerminate extends Single { + + final SingleSource source; + + final Action onAfterTerminate; + + public SingleDoAfterTerminate(SingleSource source, Action onAfterTerminate) { + this.source = source; + this.onAfterTerminate = onAfterTerminate; + } + + @Override + protected void subscribeActual(SingleObserver s) { + source.subscribe(new DoAfterTerminateObserver(s, onAfterTerminate)); + } + + static final class DoAfterTerminateObserver implements SingleObserver, Disposable { + + final SingleObserver actual; + + final Action onAfterTerminate; + + Disposable d; + + DoAfterTerminateObserver(SingleObserver actual, Action onAfterTerminate) { + this.actual = actual; + this.onAfterTerminate = onAfterTerminate; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T t) { + actual.onSuccess(t); + + onAfterTerminate(); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + + onAfterTerminate(); + } + + @Override + public void dispose() { + d.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + private void onAfterTerminate() { + try { + onAfterTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDoAfterTerminateTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDoAfterTerminateTest.java new file mode 100644 index 0000000000..b1a55dc588 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDoAfterTerminateTest.java @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.Single; +import io.reactivex.SingleSource; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class SingleDoAfterTerminateTest { + + private final int[] call = { 0 }; + + private final Action afterTerminate = new Action() { + @Override + public void run() throws Exception { + call[0]++; + } + }; + + private final TestObserver ts = new TestObserver(); + + @Test + public void just() { + Single.just(1) + .doAfterTerminate(afterTerminate) + .subscribeWith(ts) + .assertResult(1); + + assertAfterTerminateCalledOnce(); + } + + @Test + public void error() { + Single.error(new TestException()) + .doAfterTerminate(afterTerminate) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertAfterTerminateCalledOnce(); + } + + @Test(expected = NullPointerException.class) + public void afterTerminateActionNull() { + Single.just(1).doAfterTerminate(null); + } + + @Test + public void justConditional() { + Single.just(1) + .doAfterTerminate(afterTerminate) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1); + + assertAfterTerminateCalledOnce(); + } + + @Test + public void errorConditional() { + Single.error(new TestException()) + .doAfterTerminate(afterTerminate) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertAfterTerminateCalledOnce(); + } + + @Test + public void actionThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Single.just(1) + .doAfterTerminate(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().doAfterTerminate(afterTerminate)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single m) throws Exception { + return m.doAfterTerminate(afterTerminate); + } + }); + } + + private void assertAfterTerminateCalledOnce() { + assertEquals(1, call[0]); + } +}