From 014c1bed786b41de2a13eeef5676ef63630dffd0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 11 Sep 2019 12:51:45 +0200 Subject: [PATCH] 3.x: Fix takeLast(time) last events time window calculation. --- .../observable/ObservableTakeLastTimed.java | 3 ++- .../flowable/FlowableTakeLastTimedTest.java | 25 ++++++++++++++++++- .../ObservableTakeLastTimedTest.java | 25 ++++++++++++++++++- .../testsupport/TimesteppingScheduler.java | 13 +++++++--- 4 files changed, 59 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimed.java index 9db440047b..0b9e16e26f 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimed.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimed.java @@ -139,6 +139,7 @@ void drain() { final Observer a = downstream; final SpscLinkedArrayQueue q = queue; final boolean delayError = this.delayError; + final long timestampLimit = scheduler.now(unit) - time; for (;;) { if (cancelled) { @@ -171,7 +172,7 @@ void drain() { @SuppressWarnings("unchecked") T o = (T)q.poll(); - if ((Long)ts < scheduler.now(unit) - time) { + if ((Long)ts < timestampLimit) { continue; } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLastTimedTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLastTimedTest.java index 8d21caef59..745cdf01dd 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLastTimedTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLastTimedTest.java @@ -28,7 +28,7 @@ import io.reactivex.rxjava3.processors.PublishProcessor; import io.reactivex.rxjava3.schedulers.*; import io.reactivex.rxjava3.subscribers.TestSubscriber; -import io.reactivex.rxjava3.testsupport.TestHelper; +import io.reactivex.rxjava3.testsupport.*; public class FlowableTakeLastTimedTest extends RxJavaTest { @@ -338,4 +338,27 @@ public Publisher apply(Flowable f) throws Exception { public void badRequest() { TestHelper.assertBadRequestReported(PublishProcessor.create().takeLast(1, TimeUnit.SECONDS)); } + + @Test + public void lastWindowIsFixedInTime() { + TimesteppingScheduler scheduler = new TimesteppingScheduler(); + scheduler.stepEnabled = false; + + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .takeLast(2, TimeUnit.SECONDS, scheduler) + .test(); + + pp.onNext(1); + pp.onNext(2); + pp.onNext(3); + pp.onNext(4); + + scheduler.stepEnabled = true; + + pp.onComplete(); + + ts.assertResult(1, 2, 3, 4); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimedTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimedTest.java index a1c7c40314..a81fd1d9cb 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimedTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimedTest.java @@ -26,7 +26,7 @@ import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.schedulers.*; import io.reactivex.rxjava3.subjects.PublishSubject; -import io.reactivex.rxjava3.testsupport.TestHelper; +import io.reactivex.rxjava3.testsupport.*; public class ObservableTakeLastTimedTest extends RxJavaTest { @@ -277,4 +277,27 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void lastWindowIsFixedInTime() { + TimesteppingScheduler scheduler = new TimesteppingScheduler(); + scheduler.stepEnabled = false; + + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .takeLast(2, TimeUnit.SECONDS, scheduler) + .test(); + + ps.onNext(1); + ps.onNext(2); + ps.onNext(3); + ps.onNext(4); + + scheduler.stepEnabled = true; + + ps.onComplete(); + + to.assertResult(1, 2, 3, 4); + } } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TimesteppingScheduler.java b/src/test/java/io/reactivex/rxjava3/testsupport/TimesteppingScheduler.java index bf1da1237c..cb9a81eca0 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TimesteppingScheduler.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TimesteppingScheduler.java @@ -22,7 +22,7 @@ * Basic scheduler that produces an ever increasing {@link #now(TimeUnit)} value. * Use this scheduler only as a time source! */ -public class TimesteppingScheduler extends Scheduler { +public final class TimesteppingScheduler extends Scheduler { final class TimesteppingWorker extends Worker { @Override @@ -42,11 +42,13 @@ public Disposable schedule(Runnable run, long delay, TimeUnit unit) { @Override public long now(TimeUnit unit) { - return time++; + return TimesteppingScheduler.this.now(unit); } } - long time; + public long time; + + public boolean stepEnabled = true; @Override public Worker createWorker() { @@ -55,6 +57,9 @@ public Worker createWorker() { @Override public long now(TimeUnit unit) { - return time++; + if (stepEnabled) { + return time++; + } + return time; } }