Skip to content

Commit 6aee21d

Browse files
committed
StreamUtils.zip(…) now treats infinite streams correctly.
When an infinite Stream was handed into StreamUtils.zip(…) as first argument, the resulting stream was infinite, too, while inverting the argument order was limiting the resulting stream to the length of the finite one. This is now fixed by actually evaluating whether we can advance on both of the streams and shortcutting the process if that is not possible on either of the streams, limiting the processing of the overall Stream to the shorter of the two as already advertised in the Javadoc. Fixes #2426.
1 parent 3325dc5 commit 6aee21d

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.springframework.data.util;
2+
3+
import java.util.function.Consumer;
4+
5+
import org.springframework.lang.Nullable;
6+
7+
/**
8+
* A simple {@link Consumer} that captures the instance handed into it.
9+
*
10+
* @author Oliver Drotbohm
11+
* @since 2.4.12
12+
*/
13+
class Sink<T> implements Consumer<T> {
14+
15+
private T value;
16+
17+
/**
18+
* Returns the value captured.
19+
*
20+
* @return
21+
*/
22+
public T getValue() {
23+
return value;
24+
}
25+
26+
/*
27+
* (non-Javadoc)
28+
* @see java.util.function.Consumer#accept(java.lang.Object)
29+
*/
30+
@Override
31+
public void accept(@Nullable T t) {
32+
this.value = t;
33+
}
34+
}

src/main/java/org/springframework/data/util/StreamUtils.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,26 @@ public static <L, R, T> Stream<T> zip(Stream<L> left, Stream<R> right, BiFunctio
140140
@Override
141141
@SuppressWarnings("null")
142142
public boolean tryAdvance(Consumer<? super T> action) {
143-
return lefts.tryAdvance(left -> rights.tryAdvance(right -> action.accept(combiner.apply(left, right))));
144-
}
145143

144+
Sink<L> leftSink = new Sink<L>();
145+
Sink<R> rightSink = new Sink<R>();
146+
147+
boolean leftAdvance = lefts.tryAdvance(leftSink);
148+
149+
if (!leftAdvance) {
150+
return false;
151+
}
152+
153+
boolean rightAdvance = rights.tryAdvance(rightSink);
154+
155+
if (!rightAdvance) {
156+
return false;
157+
}
158+
159+
action.accept(combiner.apply(leftSink.getValue(), rightSink.getValue()));
160+
161+
return true;
162+
}
146163
}, parallel);
147164
}
148165
}

src/test/java/org/springframework/data/util/StreamUtilsTests.java

+9
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,13 @@ public void shouldConvertAnIteratorToAStream() {
4242

4343
assertThat(input).isEqualTo(output);
4444
}
45+
46+
@Test // #2426
47+
void combinesInfiniteStreamCorrectly() {
48+
49+
Stream<Long> indices = Stream.iterate(1L, n -> n + 1);
50+
Stream<String> lines = Stream.of("first line", "second line");
51+
52+
assertThat(StreamUtils.zip(indices, lines, (index, line) -> index + ":" + line).count()).isEqualTo(2);
53+
}
4554
}

0 commit comments

Comments
 (0)