ReactiveX有一个非常整洁的运算符,称为Scan,它类似于reduce,只是它发出每个中间累加器。
如何使用 Java Streams 实现此目的?Stream#reduce
不是我想要的,因为它返回T
:
T reduce(T identity, BinaryOperator<T> accumulator)
我想要的是返回Stream<T>
,流的每个项目都是每次调用accumulator
时返回的T
:
Stream.of(1, 2, 3)
.myScan(0, (accumulator, i) -> accumulator + i)
.collect(Collectors.toList()); // [1, 3, 6]
我可以做一些蹩脚的事情,比如减少List
,然后转换回Stream
,但这很丑陋。
试试 StreamEx。我认为它提供了您想要的提取 API:
List<Integer> result = StreamEx.of(1, 2, 3).scanLeft((i, j) -> i + j);
System.out.println(result); // [1, 3, 6]
Stream 不支持此操作。 您可以将其转换为迭代器,在那里执行,然后转换回流,但是流没有内置任何功能来执行此操作。
java-stream最大的问题是它被高估了,并被理解为与任何Collection略有相关的一切的魔术子弹解决方案。
此 API 是为支持元素的顺序处理而构建的。不存在任何类似流的方法执行下面的代码片段。此外,此 API 不容易扩展。
这是你在说的:
List<Integer> list = new ArrayList<>();
Stream.of(1, 2, 3).reduce(0, (t, u) -> {
final Integer s = t + u;
list.add(s);
return s;
});
System.out.println(list); // prints [1, 3, 6]
这是使用AtomicInteger
的解决方法,它允许您在执行map(..)
后继续流式传输:
AtomicInteger sum = new AtomicInteger(0);
List<Integer> list = Stream.of(1, 2, 3)
.map(i -> sum.addAndGet(i))
.collect(Collectors.toList());
System.out.println(list); // prints [1, 3, 6]
有时你必须使用不同的方式。
你可以尝试从其他不鼓励的副作用中受益。由于您的目标是处理流并继续处理流,因此我们可以使用map
方法实现此目的。下面的代码利用副作用来做到这一点。
public class Scan {
public static void main(String[] args) {
Accumulator accumulator = new Accumulator();
Stream<Integer> originalStream = Stream.of(1, 2, 3);
Stream<Integer> scannedStream = originalStream
.map(i -> accumulator.accumulate(i));
List<Integer> list = scannedStream
.collect(Collectors.toList()); // [1, 3, 6]
for (Integer i : list) {
System.out.println(i);
}
}
private static class Accumulator {
private int value;
public int accumulate(int i) {
return value += i;
}
}
}
Accumulator
可以替换为用于不同扫描操作的Function
。
但是您必须了解限制和顾虑(例如线程安全(。