Skip to content

Question about interrupted threads in RxJava2 #5024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ahmedre opened this issue Jan 27, 2017 · 7 comments
Closed

Question about interrupted threads in RxJava2 #5024

ahmedre opened this issue Jan 27, 2017 · 7 comments

Comments

@ahmedre
Copy link
Contributor

ahmedre commented Jan 27, 2017

hi,
i have a question that's similar to #4863, but a bit simpler. i am running into a strange issue that i have been able to simplify to a simple test case - what’s happening is, sometimes, by the time my most downstream method is called, the thread is interrupted (which causes issues because the code i call there bails if the thread is interrupted).

i've been able to simplify the code and repro using this:

  private Single<ArrayList<Integer>> fakeDataPiece() {
    return Single.fromCallable(() -> {
      Thread.sleep(500);
      return new ArrayList<Integer>();
    }).subscribeOn(Schedulers.io());
  }

  private Single<BookmarkData> fakeGetData() {
    return Single.zip(fakeDataPiece(), fakeDataPiece(), fakeDataPiece(),
        (integers, integers2, integers3) ->
            new BookmarkData(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()))
        .subscribeOn(Schedulers.io());
  }

  public Single<Uri> exportBookmarksObservable() {
        return fakeGetData()
        .flatMap(bookmarkData -> Single.just(exportBookmarks(bookmarkData)))
        .subscribeOn(Schedulers.io());
  }

by the time exportBookmarks is called, in many cases, Thread.currentThread().isInterrupted() returns true.

my question is: why is the thread interrupted? (i am curious as to whether this is expected or if there is something i am misunderstanding or doing wrong).

observations:

  1. if i explicitly remove all the subscribeOns except for the one in exportBookmarksObservable, then i don’t see the issue (though it does change the behavior a bit in the sense that, with this, all workers are run on the same thread as opposed to each piece of work happening on a potentially different thread).
  2. if i add a doOnSuccess in fakeGetData(), i find the thread is interrupted (and is the same thread that exportBookmarks gets called on, while also being interrupted). this also is the same as one of the fakeDataPiece threads, which, at the time of its completion, is not interrupted.
  3. calling Thread.interrupted() as suggested in Get RxCachedThreadScheduler-n when calling Disposable.dispose() #4863 also fixes the problem.

i am using RxJava 2.0.4 - thanks!

@akarnokd
Copy link
Member

Could you create a self contained unit test (in a gist)? Or you could apply doOnDisposed() everywhere to see if the problem is due to internal disposing.

@ahmedre
Copy link
Contributor Author

ahmedre commented Jan 27, 2017

here is a self contained unit test that fails:
https://gist.github.com/ahmedre/b7267d020b284faa5552c090b79707ad

with respect to doOnDisposed, i find that only the first 3 parameters to zip get doOnDisposed called, with the same thread for all 3, and with isInterrupted() set to true for the second and third parameters.

@akarnokd
Copy link
Member

Could you post the stacktrace at those doOnDispose placed to see what is calling through them.

@ahmedre
Copy link
Contributor Author

ahmedre commented Jan 27, 2017

sure:

fakeDataPiece: Thread[RxCachedThreadScheduler-2,5,main], false
java.lang.Thread.getStackTrace(Thread.java:1556)
com.quran.labs.androidquran.model.bookmark.ContrivedTest.lambda$fakeDataPiece$1(ContrivedTest.java:15)
io.reactivex.internal.operators.single.SingleDoOnDispose$DoOnDisposeObserver.dispose(SingleDoOnDispose.java:60)
io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.cancel(SingleToFlowable.java:74)
io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:188)
io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.cancel(FlowableZip.java:404)
io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.cancelAll(FlowableZip.java:161)
io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.drain(FlowableZip.java:208)
io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:388)
io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:117)
io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:63)
io.reactivex.internal.operators.single.SingleDoOnDispose$DoOnDisposeObserver.onSuccess(SingleDoOnDispose.java:84)
io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.onSuccess(SingleSubscribeOn.java:68)
io.reactivex.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:37)
io.reactivex.Single.subscribe(Single.java:2656)
io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
io.reactivex.Scheduler$1.run(Scheduler.java:134)
io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

fakeDataPiece: Thread[RxCachedThreadScheduler-2,5,main], true
java.lang.Thread.getStackTrace(Thread.java:1556)
com.quran.labs.androidquran.model.bookmark.ContrivedTest.lambda$fakeDataPiece$1(ContrivedTest.java:15)
io.reactivex.internal.operators.single.SingleDoOnDispose$DoOnDisposeObserver.dispose(SingleDoOnDispose.java:60)
io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.cancel(SingleToFlowable.java:74)
io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:188)
io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.cancel(FlowableZip.java:404)
io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.cancelAll(FlowableZip.java:161)
io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.drain(FlowableZip.java:208)
io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:388)
io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:117)
io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:63)
io.reactivex.internal.operators.single.SingleDoOnDispose$DoOnDisposeObserver.onSuccess(SingleDoOnDispose.java:84)
io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.onSuccess(SingleSubscribeOn.java:68)
io.reactivex.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:37)
io.reactivex.Single.subscribe(Single.java:2656)
io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
io.reactivex.Scheduler$1.run(Scheduler.java:134)
io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

fakeDataPiece: Thread[RxCachedThreadScheduler-2,5,main], true
java.lang.Thread.getStackTrace(Thread.java:1556)
com.quran.labs.androidquran.model.bookmark.ContrivedTest.lambda$fakeDataPiece$1(ContrivedTest.java:15)
io.reactivex.internal.operators.single.SingleDoOnDispose$DoOnDisposeObserver.dispose(SingleDoOnDispose.java:60)
io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.cancel(SingleToFlowable.java:74)
io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:188)
io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.cancel(FlowableZip.java:404)
io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.cancelAll(FlowableZip.java:161)
io.reactivex.internal.operators.flowable.FlowableZip$ZipCoordinator.drain(FlowableZip.java:208)
io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:388)
io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:117)
io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:63)
io.reactivex.internal.operators.single.SingleDoOnDispose$DoOnDisposeObserver.onSuccess(SingleDoOnDispose.java:84)
io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.onSuccess(SingleSubscribeOn.java:68)
io.reactivex.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:37)
io.reactivex.Single.subscribe(Single.java:2656)
io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
io.reactivex.Scheduler$1.run(Scheduler.java:134)
io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

@akarnokd
Copy link
Member

Thanks, I see the problem now: Single.zip delegates to Flowable.zip which cancels when a source completes before the others complete.

This has to be fixed on the operator level so I can't give you any workaround other than the clearing of the interrupted flag or a custom operator that suppresses cancellation if it comes after an onsuccess.

@akarnokd
Copy link
Member

Closing via #5027.

@ahmedre
Copy link
Contributor Author

ahmedre commented Jan 30, 2017

thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants