diff --git a/docs/Backpressure.md b/docs/Backpressure.md
index 8feec0d487..bfe90330bb 100644
--- a/docs/Backpressure.md
+++ b/docs/Backpressure.md
@@ -34,25 +34,25 @@ The following diagrams show how you could use each of these operators on the bur
The `sample` operator periodically "dips" into the sequence and emits only the most recently emitted item during each dip:
-````groovy
+```java
Observable burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
-````
+```
### throttleFirst
The `throttleFirst` operator is similar, but emits not the most recently emitted item, but the first item that was emitted after the previous "dip":
-````groovy
+```java
Observable burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
-````
+```
### debounce (or throttleWithTimeout)
The `debounce` operator emits only those items from the source Observable that are not followed by another item within a specified duration:
-````groovy
+```java
Observable burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
-````
+```
## Buffers and windows
@@ -65,14 +65,14 @@ The following diagrams show how you could use each of these operators on the bur
You could, for example, close and emit a buffer of items from the bursty Observable periodically, at a regular interval of time:
-````groovy
+```java
Observable> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
-````
+```
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the `debounce` operator to emit a buffer closing indicator to the `buffer` operator:
-````groovy
+```java
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable burstyMulticast = bursty.publish().refCount();
@@ -87,16 +87,16 @@ Observable> burstyBuffered = burstyMulticast.buffer(burstyDebounce
`window` is similar to `buffer`. One variant of `window` allows you to periodically emit Observable windows of items at a regular interval of time:
-````groovy
+```java
Observable> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
````
You could also choose to emit a new window each time you have collected a particular number of items from the source Observable:
-````groovy
+```java
Observable> burstyWindowed = bursty.window(5);
-````
+```
# Callstack blocking as a flow-control alternative to backpressure
@@ -110,8 +110,8 @@ When you subscribe to an `Observable` with a `Subscriber`, you can request react
Then, after handling this item (or these items) in `onNext()`, you can call `request()` again to instruct the `Observable` to emit another item (or items). Here is an example of a `Subscriber` that requests one item at a time from `someObservable`:
-````java
-someObservable.subscribe(new Subscriber() {
+```java
+someObservable.subscribe(new Subscriber() {
@Override
public void onStart() {
request(1);
@@ -134,7 +134,7 @@ someObservable.subscribe(new Subscriber() {
request(1);
}
});
-````
+```
You can pass a magic number to `request`, `request(Long.MAX_VALUE)`, to disable reactive pull backpressure and to ask the Observable to emit items at its own pace. `request(0)` is a legal call, but has no effect. Passing values less than zero to `request` will cause an exception to be thrown.
diff --git a/docs/Combining-Observables.md b/docs/Combining-Observables.md
index bcc19f6b0f..fcde91b76b 100644
--- a/docs/Combining-Observables.md
+++ b/docs/Combining-Observables.md
@@ -2,13 +2,13 @@ This section explains operators you can use to combine multiple Observables.
# Outline
-- [`combineLatest`](#combineLatest)
+- [`combineLatest`](#combinelatest)
- [`join` and `groupJoin`](#joins)
- [`merge`](#merge)
-- [`mergeDelayError`](#mergeDelayError)
+- [`mergeDelayError`](#mergedelayerror)
- [`rxjava-joins`](#rxjava-joins)
-- [`startWith`](#startWith)
-- [`switchOnNext`](#switchOnNext)
+- [`startWith`](#startwith)
+- [`switchOnNext`](#switchonnext)
- [`zip`](#zip)
## startWith
diff --git a/docs/How-To-Use-RxJava.md b/docs/How-To-Use-RxJava.md
index 16d156caa9..ad309496b9 100644
--- a/docs/How-To-Use-RxJava.md
+++ b/docs/How-To-Use-RxJava.md
@@ -96,9 +96,9 @@ You use the Observable [`just( )`](http://reactivex.io/documentation/operators
Observable o = Observable.from("a", "b", "c");
def list = [5, 6, 7, 8]
-Observable o = Observable.from(list);
+Observable o2 = Observable.from(list);
-Observable o = Observable.just("one object");
+Observable o3 = Observable.just("one object");
```
These converted Observables will synchronously invoke the [`onNext( )`](Observable#onnext-oncompleted-and-onerror) method of any subscriber that subscribes to them, for each item to be emitted by the Observable, and will then invoke the subscriber’s [`onCompleted( )`](Observable#onnext-oncompleted-and-onerror) method.
diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md
index 7b6e57666e..e8486564b1 100644
--- a/docs/Writing-operators-for-2.0.md
+++ b/docs/Writing-operators-for-2.0.md
@@ -1,7 +1,7 @@
##### Table of contents
- [Introduction](#introduction)
- - [Warning on internal components](warning-on-internal-components)
+ - [Warning on internal components](#warning-on-internal-components)
- [Atomics, serialization, deferred actions](#atomics-serialization-deferred-actions)
- [Field updaters and Android](#field-updaters-and-android)
- [Request accounting](#request-accounting)
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
index 6d5bb44b23..836957192e 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
@@ -86,48 +86,42 @@ public void setup() {
singleFlatMapPublisher = Single.just(1).flatMapPublisher(new Function>() {
@Override
- public Publisher extends Integer> apply(Integer v)
- throws Exception {
+ public Publisher extends Integer> apply(Integer v) {
return arrayFlowable;
}
});
singleFlatMapHidePublisher = Single.just(1).flatMapPublisher(new Function>() {
@Override
- public Publisher extends Integer> apply(Integer v)
- throws Exception {
+ public Publisher extends Integer> apply(Integer v) {
return arrayFlowableHide;
}
});
singleFlattenAsPublisher = Single.just(1).flattenAsFlowable(new Function>() {
@Override
- public Iterable extends Integer> apply(Integer v)
- throws Exception {
+ public Iterable extends Integer> apply(Integer v) {
return list;
}
});
maybeFlatMapPublisher = Maybe.just(1).flatMapPublisher(new Function>() {
@Override
- public Publisher extends Integer> apply(Integer v)
- throws Exception {
+ public Publisher extends Integer> apply(Integer v) {
return arrayFlowable;
}
});
maybeFlatMapHidePublisher = Maybe.just(1).flatMapPublisher(new Function>() {
@Override
- public Publisher extends Integer> apply(Integer v)
- throws Exception {
+ public Publisher extends Integer> apply(Integer v) {
return arrayFlowableHide;
}
});
maybeFlattenAsPublisher = Maybe.just(1).flattenAsFlowable(new Function>() {
@Override
- public Iterable extends Integer> apply(Integer v)
- throws Exception {
+ public Iterable extends Integer> apply(Integer v) {
return list;
}
});
@@ -140,48 +134,42 @@ public Iterable extends Integer> apply(Integer v)
singleFlatMapObservable = Single.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v)
- throws Exception {
+ public Observable extends Integer> apply(Integer v) {
return arrayObservable;
}
});
singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v)
- throws Exception {
+ public Observable extends Integer> apply(Integer v) {
return arrayObservableHide;
}
});
singleFlattenAsObservable = Single.just(1).flattenAsObservable(new Function>() {
@Override
- public Iterable extends Integer> apply(Integer v)
- throws Exception {
+ public Iterable extends Integer> apply(Integer v) {
return list;
}
});
maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v)
- throws Exception {
+ public Observable extends Integer> apply(Integer v) {
return arrayObservable;
}
});
maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v)
- throws Exception {
+ public Observable extends Integer> apply(Integer v) {
return arrayObservableHide;
}
});
maybeFlattenAsObservable = Maybe.just(1).flattenAsObservable(new Function>() {
@Override
- public Iterable extends Integer> apply(Integer v)
- throws Exception {
+ public Iterable extends Integer> apply(Integer v) {
return list;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/CallableAsyncPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/CallableAsyncPerf.java
index 4afa54ecc8..8098b1aaf4 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/CallableAsyncPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/CallableAsyncPerf.java
@@ -68,7 +68,7 @@ public void setup() {
Callable c = new Callable() {
@Override
- public Integer call() throws Exception {
+ public Integer call() {
return 1;
}
};
@@ -120,71 +120,71 @@ public void subscribeOnFlowable(Blackhole bh) {
@Benchmark
public void observeOnFlowable(Blackhole bh) {
observeOnFlowable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void pipelineFlowable(Blackhole bh) {
pipelineFlowable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void subscribeOnObservable(Blackhole bh) {
subscribeOnObservable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void observeOnObservable(Blackhole bh) {
observeOnObservable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void pipelineObservable(Blackhole bh) {
pipelineObservable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void observeOnSingle(Blackhole bh) {
observeOnSingle.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void subscribeOnSingle(Blackhole bh) {
subscribeOnSingle.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void pipelineSingle(Blackhole bh) {
pipelineSingle.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void observeOnCompletable(Blackhole bh) {
observeOnCompletable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void subscribeOnCompletable(Blackhole bh) {
subscribeOnCompletable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void pipelineCompletable(Blackhole bh) {
pipelineCompletable.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void observeOnMaybe(Blackhole bh) {
observeOnMaybe.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void subscribeOnMaybe(Blackhole bh) {
subscribeOnMaybe.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
@Benchmark
public void pipelineMaybe(Blackhole bh) {
pipelineMaybe.subscribeWith(new PerfAsyncConsumer(bh)).await(1);
- };
+ }
}
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/EachTypeFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/EachTypeFlatMapPerf.java
index e05921c531..37a4f196e8 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/EachTypeFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/EachTypeFlatMapPerf.java
@@ -114,11 +114,11 @@ public void nbpRangeMapRange(Blackhole bh) {
@Benchmark
public void singleJust(Blackhole bh) {
- singleJust.subscribe(new LatchedSingleObserver(bh));
+ singleJust.subscribe(new LatchedSingleObserver<>(bh));
}
@Benchmark
public void singleJustMapJust(Blackhole bh) {
- singleJustMapJust.subscribe(new LatchedSingleObserver(bh));
+ singleJustMapJust.subscribe(new LatchedSingleObserver<>(bh));
}
}
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/FlatMapJustPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/FlatMapJustPerf.java
index 8541f92834..2a1fd7539c 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/FlatMapJustPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/FlatMapJustPerf.java
@@ -41,14 +41,14 @@ public void setup() {
flowable = Flowable.fromArray(array).flatMap(new Function>() {
@Override
- public Publisher apply(Integer v) throws Exception {
+ public Publisher apply(Integer v) {
return Flowable.just(v);
}
});
observable = Observable.fromArray(array).flatMap(new Function>() {
@Override
- public Observable apply(Integer v) throws Exception {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/FlattenCrossMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/FlattenCrossMapPerf.java
index cfc5a1af97..866fe5de77 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/FlattenCrossMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/FlattenCrossMapPerf.java
@@ -47,14 +47,14 @@ public void setup() {
flowable = Flowable.fromArray(array).flatMapIterable(new Function>() {
@Override
- public Iterable apply(Integer v) throws Exception {
+ public Iterable apply(Integer v) {
return list;
}
});
observable = Observable.fromArray(array).flatMapIterable(new Function>() {
@Override
- public Iterable apply(Integer v) throws Exception {
+ public Iterable apply(Integer v) {
return list;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/FlattenJustPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/FlattenJustPerf.java
index bb2394a053..d2ac9aa5b7 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/FlattenJustPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/FlattenJustPerf.java
@@ -44,14 +44,14 @@ public void setup() {
flowable = Flowable.fromArray(array).flatMapIterable(new Function>() {
@Override
- public Iterable apply(Integer v) throws Exception {
+ public Iterable apply(Integer v) {
return singletonList;
}
});
observable = Observable.fromArray(array).flatMapIterable(new Function>() {
@Override
- public Iterable apply(Integer v) throws Exception {
+ public Iterable apply(Integer v) {
return singletonList;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/FlattenRangePerf.java b/src/jmh/java/io/reactivex/rxjava3/core/FlattenRangePerf.java
index 663fd8cdde..a3195b760d 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/FlattenRangePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/FlattenRangePerf.java
@@ -44,14 +44,14 @@ public void setup() {
flowable = Flowable.fromArray(array).flatMapIterable(new Function>() {
@Override
- public Iterable apply(Integer v) throws Exception {
+ public Iterable apply(Integer v) {
return list;
}
});
observable = Observable.fromArray(array).flatMapIterable(new Function>() {
@Override
- public Iterable apply(Integer v) throws Exception {
+ public Iterable apply(Integer v) {
return list;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/FlowableFlatMapCompletableAsyncPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/FlowableFlatMapCompletableAsyncPerf.java
index 83f2d4c4f4..e9494e97df 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/FlowableFlatMapCompletableAsyncPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/FlowableFlatMapCompletableAsyncPerf.java
@@ -45,7 +45,7 @@ public class FlowableFlatMapCompletableAsyncPerf implements Action {
Flowable