Skip to content

3.x: Add missing throwIfFatal calls #6801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.reactivex.rxjava3.disposables;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;

/**
* A disposable container that manages an {@link AutoCloseable} instance.
Expand All @@ -33,7 +33,7 @@ protected void onDisposed(@NonNull AutoCloseable value) {
try {
value.close();
} catch (Throwable ex) {
RxJavaPlugins.onError(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
Expand Down Expand Up @@ -61,6 +61,7 @@ protected void subscribeActual(Subscriber<? super R> s) {
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Expand Down Expand Up @@ -243,6 +244,7 @@ void drain() {
try {
t = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
trySignalError(downstream, ex);
continue;
}
Expand Down Expand Up @@ -271,6 +273,7 @@ else if (!isEmpty) {
iterator = null;
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
trySignalError(downstream, ex);
}
continue;
Expand All @@ -282,6 +285,7 @@ else if (!isEmpty) {
try {
item = Objects.requireNonNull(iterator.next(), "The Stream.Iterator returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
trySignalError(downstream, ex);
continue;
}
Expand All @@ -297,6 +301,7 @@ else if (!isEmpty) {
clearCurrentRethrowCloseError();
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
trySignalError(downstream, ex);
}
}
Expand Down Expand Up @@ -328,6 +333,7 @@ void clearCurrentSuppressCloseError() {
try {
clearCurrentRethrowCloseError();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ protected void subscribeActual(Observer<? super R> observer) {
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ protected void subscribeActual(Subscriber<? super U> s) {
try {
u = Objects.requireNonNull(initialSupplier.get(), "The initial value supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ protected void subscribeActual(SingleObserver<? super U> observer) {
try {
u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public T poll() throws Throwable {
try {
onError.accept(ex);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
throw new CompositeException(ex, exc);
}
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
Expand All @@ -173,6 +174,7 @@ public T poll() throws Throwable {
try {
onError.accept(ex);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
throw new CompositeException(ex, exc);
}
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
Expand Down Expand Up @@ -314,6 +316,7 @@ public T poll() throws Throwable {
try {
onError.accept(ex);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
throw new CompositeException(ex, exc);
}
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
Expand All @@ -328,6 +331,7 @@ public T poll() throws Throwable {
try {
onError.accept(ex);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
throw new CompositeException(ex, exc);
}
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public void connect(Consumer<? super Disposable> connection) {
try {
connection.accept(ps);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (doConnect) {
ps.shouldConnect.compareAndSet(true, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void subscribeActual(Subscriber<? super R> s) {
try {
other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null Publisher");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ void drain() {
try {
endSource = Objects.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
startSubscriber.cancel();
resources.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ public void onNext(T t) {
try {
u = Objects.requireNonNull(iterator.next(), "The iterator returned a null value");
} catch (Throwable e) {
error(e);
fail(e);
return;
}

V v;
try {
v = Objects.requireNonNull(zipper.apply(t, u), "The zipper function returned a null value");
} catch (Throwable e) {
error(e);
fail(e);
return;
}

Expand All @@ -120,7 +120,7 @@ public void onNext(T t) {
try {
b = iterator.hasNext();
} catch (Throwable e) {
error(e);
fail(e);
return;
}

Expand All @@ -131,7 +131,7 @@ public void onNext(T t) {
}
}

void error(Throwable e) {
void fail(Throwable e) {
Exceptions.throwIfFatal(e);
done = true;
upstream.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {
v = future.get(timeout, unit);
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (ex instanceof ExecutionException) {
ex = ex.getCause();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void onNext(T t) {
try {
b = ExceptionHelper.nullCheck(bufferSupplier.get(), "The bufferSupplier returned a null Collection.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
buffers.clear();
upstream.dispose();
downstream.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Expand All @@ -37,6 +38,7 @@ protected void subscribeActual(Observer<? super U> t) {
try {
u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, t);
return;
}
Expand Down Expand Up @@ -86,6 +88,7 @@ public void onNext(T t) {
try {
collector.accept(u, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.internal.fuseable.FuseToObservable;
Expand Down Expand Up @@ -41,6 +42,7 @@ protected void subscribeActual(SingleObserver<? super U> t) {
try {
u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, t);
return;
}
Expand Down Expand Up @@ -94,6 +96,7 @@ public void onNext(T t) {
try {
collector.accept(u, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public void connect(Consumer<? super Disposable> connection) {
try {
connection.accept(ps);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (doConnect) {
ps.shouldConnect.compareAndSet(true, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public void subscribeActual(Observer<? super R> observer) {
try {
other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ void drain() {
try {
endSource = Objects.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null ObservableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
startObserver.dispose();
resources.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -56,6 +57,7 @@ public Void call() throws Exception {
setRest(executor.submit(this));
runner = null;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
runner = null;
RxJavaPlugins.onError(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.reactivex.rxjava3.internal.schedulers;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
Expand All @@ -38,6 +39,7 @@ public void run() {
runnable.run();
runner = null;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
runner = null;
lazySet(FINISHED);
RxJavaPlugins.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;

/**
Expand Down Expand Up @@ -108,6 +109,7 @@ static int getIntProperty(boolean enabled, String key, int defaultNotFound, int
}
return Integer.parseInt(value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
return defaultNotFound;
}
}
Expand All @@ -123,6 +125,7 @@ static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNo
}
return "true".equals(value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
return defaultNotFound;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,20 @@ public void fromAutoCloseableThrows() throws Throwable {

assertTrue(errors.isEmpty());

d.dispose();
try {
d.dispose();
fail("Should have thrown!");
} catch (TestException expected) {
// expected
}

assertTrue(d.isDisposed());
assertEquals(1, errors.size());

d.dispose();

assertTrue(d.isDisposed());
assertEquals(1, errors.size());

TestHelper.assertUndeliverable(errors, 0, TestException.class);
assertTrue(errors.isEmpty());
});
}

Expand Down
Loading