Java Stream equivalent of ReactiveX Observable#scan



ReactiveX有一个非常整洁的运算符,称为Scan,它类似于reduce,只是它发出每个中间累加器。

如何使用 Java Streams 实现此目的?Stream#reduce不是我想要的,因为它返回T

T reduce(T identity, BinaryOperator<T> accumulator)

我想要的是返回Stream<T>,流的每个项目都是每次调用accumulator时返回的T

Stream.of(1, 2, 3)
.myScan(0, (accumulator, i) -> accumulator + i)
.collect(Collectors.toList()); // [1, 3, 6]

我可以做一些蹩脚的事情,比如减少List,然后转换回Stream,但这很丑陋。

试试 StreamEx。我认为它提供了您想要的提取 API:

List<Integer> result = StreamEx.of(1, 2, 3).scanLeft((i, j) -> i + j);
System.out.println(result); // [1, 3, 6]

Stream 不支持此操作。 您可以将其转换为迭代器,在那里执行,然后转换回流,但是流没有内置任何功能来执行此操作。

java-stream最大的问题是它被高估了,并被理解为与任何Collection略有相关的一切的魔术子弹解决方案。

此 API 是为支持元素的顺序处理而构建的。不存在任何类似流的方法执行下面的代码片段。此外,此 API 不容易扩展。

这是你在说的:

List<Integer> list = new ArrayList<>();
Stream.of(1, 2, 3).reduce(0, (t, u) -> {
final Integer s = t + u;
list.add(s);
return s;
});
System.out.println(list);   // prints [1, 3, 6]

这是使用AtomicInteger的解决方法,它允许您在执行map(..)后继续流式传输:

AtomicInteger sum = new AtomicInteger(0);
List<Integer> list = Stream.of(1, 2, 3)
.map(i -> sum.addAndGet(i))
.collect(Collectors.toList()); 
System.out.println(list);  // prints [1, 3, 6]

有时你必须使用不同的方式。

你可以尝试从其他不鼓励的副作用中受益。由于您的目标是处理流并继续处理流,因此我们可以使用map方法实现此目的。下面的代码利用副作用来做到这一点。

public class Scan {
public static void main(String[] args) {
Accumulator accumulator = new Accumulator();
Stream<Integer> originalStream = Stream.of(1, 2, 3);
Stream<Integer> scannedStream = originalStream
.map(i -> accumulator.accumulate(i));
List<Integer> list = scannedStream
.collect(Collectors.toList()); // [1, 3, 6]
for (Integer i : list) {
System.out.println(i);
}
}
private static class Accumulator {
private int value;
public int accumulate(int i) {
return value += i;
}
}
}

Accumulator可以替换为用于不同扫描操作的Function

但是您必须了解限制和顾虑(例如线程安全(。

最新更新