Skip to content

Commit 0f10874

Browse files
committed
add @nonnull annotations to schedulers
1 parent b00bace commit 0f10874

22 files changed

+217
-128
lines changed

src/main/java/io/reactivex/Scheduler.java

+25-14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.TimeUnit;
1717

1818
import io.reactivex.annotations.Experimental;
19+
import io.reactivex.annotations.NonNull;
1920
import io.reactivex.disposables.Disposable;
2021
import io.reactivex.exceptions.Exceptions;
2122
import io.reactivex.functions.Function;
@@ -61,6 +62,7 @@ public static long clockDriftTolerance() {
6162
*
6263
* @return a Worker representing a serial queue of actions to be executed
6364
*/
65+
@NonNull
6466
public abstract Worker createWorker();
6567

6668
/**
@@ -69,7 +71,7 @@ public static long clockDriftTolerance() {
6971
* @return the 'current time'
7072
* @since 2.0
7173
*/
72-
public long now(TimeUnit unit) {
74+
public long now(@NonNull TimeUnit unit) {
7375
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
7476
}
7577

@@ -105,7 +107,8 @@ public void shutdown() {
105107
* @return the Disposable instance that let's one cancel this particular task.
106108
* @since 2.0
107109
*/
108-
public Disposable scheduleDirect(Runnable run) {
110+
@NonNull
111+
public Disposable scheduleDirect(@NonNull Runnable run) {
109112
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
110113
}
111114

@@ -122,7 +125,8 @@ public Disposable scheduleDirect(Runnable run) {
122125
* @return the Disposable that let's one cancel this particular delayed task.
123126
* @since 2.0
124127
*/
125-
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
128+
@NonNull
129+
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
126130
final Worker w = createWorker();
127131

128132
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
@@ -159,7 +163,8 @@ public void run() {
159163
* @return the Disposable that let's one cancel this particular delayed task.
160164
* @since 2.0
161165
*/
162-
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
166+
@NonNull
167+
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
163168
final Worker w = createWorker();
164169

165170
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
@@ -249,7 +254,8 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
249254
*/
250255
@SuppressWarnings("unchecked")
251256
@Experimental
252-
public <S extends Scheduler & Disposable> S when(Function<Flowable<Flowable<Completable>>, Completable> combine) {
257+
@NonNull
258+
public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
253259
return (S) new SchedulerWhen(combine, this);
254260
}
255261

@@ -268,7 +274,8 @@ public abstract static class Worker implements Disposable {
268274
* Runnable to schedule
269275
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
270276
*/
271-
public Disposable schedule(Runnable run) {
277+
@NonNull
278+
public Disposable schedule(@NonNull Runnable run) {
272279
return schedule(run, 0L, TimeUnit.NANOSECONDS);
273280
}
274281

@@ -287,7 +294,8 @@ public Disposable schedule(Runnable run) {
287294
* the time unit of {@code delayTime}
288295
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
289296
*/
290-
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
297+
@NonNull
298+
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
291299

292300
/**
293301
* Schedules a cancelable action to be executed periodically. This default implementation schedules
@@ -309,7 +317,8 @@ public Disposable schedule(Runnable run) {
309317
* the time unit of {@code period}
310318
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
311319
*/
312-
public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) {
320+
@NonNull
321+
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
313322
final SequentialDisposable first = new SequentialDisposable();
314323

315324
final SequentialDisposable sd = new SequentialDisposable(first);
@@ -337,7 +346,7 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi
337346
* @return the 'current time'
338347
* @since 2.0
339348
*/
340-
public long now(TimeUnit unit) {
349+
public long now(@NonNull TimeUnit unit) {
341350
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
342351
}
343352

@@ -346,15 +355,17 @@ public long now(TimeUnit unit) {
346355
* of this task has to happen (accounting for clock drifts).
347356
*/
348357
final class PeriodicTask implements Runnable {
358+
@NonNull
349359
final Runnable decoratedRun;
360+
@NonNull
350361
final SequentialDisposable sd;
351362
final long periodInNanoseconds;
352363
long count;
353364
long lastNowNanoseconds;
354365
long startInNanoseconds;
355366

356-
PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun,
357-
long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) {
367+
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
368+
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
358369
this.decoratedRun = decoratedRun;
359370
this.sd = sd;
360371
this.periodInNanoseconds = periodInNanoseconds;
@@ -395,12 +406,12 @@ public void run() {
395406
static class PeriodicDirectTask
396407
implements Runnable, Disposable {
397408
final Runnable run;
398-
409+
@NonNull
399410
final Worker worker;
400-
411+
@NonNull
401412
volatile boolean disposed;
402413

403-
PeriodicDirectTask(Runnable run, Worker worker) {
414+
PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
404415
this.run = run;
405416
this.worker = worker;
406417
}

src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.reactivex.internal.schedulers;
1717

1818
import io.reactivex.Scheduler;
19+
import io.reactivex.annotations.NonNull;
1920
import io.reactivex.disposables.*;
2021
import io.reactivex.internal.disposables.*;
2122

@@ -118,19 +119,22 @@ public ComputationScheduler(ThreadFactory threadFactory) {
118119
start();
119120
}
120121

122+
@NonNull
121123
@Override
122124
public Worker createWorker() {
123125
return new EventLoopWorker(pool.get().getEventLoop());
124126
}
125127

128+
@NonNull
126129
@Override
127-
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
130+
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
128131
PoolWorker w = pool.get().getEventLoop();
129132
return w.scheduleDirect(run, delay, unit);
130133
}
131134

135+
@NonNull
132136
@Override
133-
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
137+
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
134138
PoolWorker w = pool.get().getEventLoop();
135139
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
136140
}
@@ -188,16 +192,18 @@ public boolean isDisposed() {
188192
return disposed;
189193
}
190194

195+
@NonNull
191196
@Override
192-
public Disposable schedule(Runnable action) {
197+
public Disposable schedule(@NonNull Runnable action) {
193198
if (disposed) {
194199
return EmptyDisposable.INSTANCE;
195200
}
196201

197202
return poolWorker.scheduleActual(action, 0, null, serial);
198203
}
204+
@NonNull
199205
@Override
200-
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
206+
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
201207
if (disposed) {
202208
return EmptyDisposable.INSTANCE;
203209
}

src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.*;
1818

1919
import io.reactivex.Scheduler;
20+
import io.reactivex.annotations.NonNull;
2021
import io.reactivex.disposables.*;
2122
import io.reactivex.internal.disposables.*;
2223
import io.reactivex.internal.queue.MpscLinkedQueue;
@@ -29,21 +30,24 @@
2930
*/
3031
public final class ExecutorScheduler extends Scheduler {
3132

33+
@NonNull
3234
final Executor executor;
3335

3436
static final Scheduler HELPER = Schedulers.single();
3537

36-
public ExecutorScheduler(Executor executor) {
38+
public ExecutorScheduler(@NonNull Executor executor) {
3739
this.executor = executor;
3840
}
3941

42+
@NonNull
4043
@Override
4144
public Worker createWorker() {
4245
return new ExecutorWorker(executor);
4346
}
4447

48+
@NonNull
4549
@Override
46-
public Disposable scheduleDirect(Runnable run) {
50+
public Disposable scheduleDirect(@NonNull Runnable run) {
4751
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
4852
try {
4953
if (executor instanceof ExecutorService) {
@@ -60,8 +64,9 @@ public Disposable scheduleDirect(Runnable run) {
6064
}
6165
}
6266

67+
@NonNull
6368
@Override
64-
public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) {
69+
public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final TimeUnit unit) {
6570
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
6671
if (executor instanceof ScheduledExecutorService) {
6772
try {
@@ -87,8 +92,9 @@ public void run() {
8792
return dr;
8893
}
8994

95+
@NonNull
9096
@Override
91-
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
97+
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
9298
if (executor instanceof ScheduledExecutorService) {
9399
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
94100
try {
@@ -118,8 +124,9 @@ public ExecutorWorker(Executor executor) {
118124
this.queue = new MpscLinkedQueue<Runnable>();
119125
}
120126

127+
@NonNull
121128
@Override
122-
public Disposable schedule(Runnable run) {
129+
public Disposable schedule(@NonNull Runnable run) {
123130
if (disposed) {
124131
return EmptyDisposable.INSTANCE;
125132
}
@@ -143,8 +150,9 @@ public Disposable schedule(Runnable run) {
143150
return br;
144151
}
145152

153+
@NonNull
146154
@Override
147-
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
155+
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
148156
if (delay <= 0) {
149157
return schedule(run);
150158
}

src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.TimeUnit;
1717

1818
import io.reactivex.Scheduler;
19+
import io.reactivex.annotations.NonNull;
1920
import io.reactivex.disposables.*;
2021

2122
/**
@@ -45,22 +46,26 @@ private ImmediateThinScheduler() {
4546
// singleton class
4647
}
4748

49+
@NonNull
4850
@Override
49-
public Disposable scheduleDirect(Runnable run) {
51+
public Disposable scheduleDirect(@NonNull Runnable run) {
5052
run.run();
5153
return DISPOSED;
5254
}
5355

56+
@NonNull
5457
@Override
55-
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
58+
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
5659
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
5760
}
5861

62+
@NonNull
5963
@Override
60-
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
64+
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
6165
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
6266
}
6367

68+
@NonNull
6469
@Override
6570
public Worker createWorker() {
6671
return WORKER;
@@ -78,19 +83,22 @@ public boolean isDisposed() {
7883
return false; // dispose() has no effect
7984
}
8085

86+
@NonNull
8187
@Override
82-
public Disposable schedule(Runnable run) {
88+
public Disposable schedule(@NonNull Runnable run) {
8389
run.run();
8490
return DISPOSED;
8591
}
8692

93+
@NonNull
8794
@Override
88-
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
95+
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
8996
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
9097
}
9198

99+
@NonNull
92100
@Override
93-
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) {
101+
public Disposable schedulePeriodically(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
94102
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
95103
}
96104
}

src/main/java/io/reactivex/internal/schedulers/IoScheduler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.reactivex.internal.schedulers;
1818

1919
import io.reactivex.Scheduler;
20+
import io.reactivex.annotations.NonNull;
2021
import io.reactivex.disposables.*;
2122
import io.reactivex.internal.disposables.EmptyDisposable;
2223

@@ -180,6 +181,7 @@ public void shutdown() {
180181
}
181182
}
182183

184+
@NonNull
183185
@Override
184186
public Worker createWorker() {
185187
return new EventLoopWorker(pool.get());
@@ -223,8 +225,9 @@ public boolean isDisposed() {
223225
return once.get();
224226
}
225227

228+
@NonNull
226229
@Override
227-
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
230+
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
228231
if (tasks.isDisposed()) {
229232
// don't schedule, we are unsubscribed
230233
return EmptyDisposable.INSTANCE;

src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.reactivex.internal.schedulers;
1818

1919
import io.reactivex.Scheduler;
20+
import io.reactivex.annotations.NonNull;
2021

2122
import java.util.concurrent.ThreadFactory;
2223

@@ -48,6 +49,7 @@ public NewThreadScheduler(ThreadFactory threadFactory) {
4849
this.threadFactory = threadFactory;
4950
}
5051

52+
@NonNull
5153
@Override
5254
public Worker createWorker() {
5355
return new NewThreadWorker(threadFactory);

0 commit comments

Comments
 (0)