如何延迟评估嵌套的flatMap



我正试图从两个潜在的无限流中变出一个笛卡尔乘积,然后通过limit()对其进行限制。

到目前为止,这(大致(是我的策略:

@Test
void flatMapIsLazy() {
Stream.of("a", "b", "c")
.flatMap(s -> Stream.of("x", "y")
.flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);
}

这行不通。

显然,我的第二个流在第一次在管道上使用时会在现场进行最终评估。它不会产生一个懒惰的流,然后我可以按照自己的节奏消耗。

我认为ReferencePipeline#flatMap的这段代码中的.forEach是罪魁祸首:

@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
}

我预计上面的代码会返回20个元素,看起来像:

a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx

但相反,它会与OutOfMemoryError崩溃,因为嵌套的flatMap中非常长的Stream会被急切地求值(??(,并用重复字符串的不必要副本填充我的内存。如果不是Integer.MAX_VALUE,而是提供值3,将相同的限制保持在20,则预期输出将为:

a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)

编辑:在这一点上,我刚刚推出了我自己的懒惰迭代器实现。尽管如此,我认为应该有一种方法来实现纯Streams。

编辑2:这在Java中被认为是一个错误通知单https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20

正如您已经写过的,这已经被认为是一个bug。也许,它将在未来的Java版本中得到解决。

但即使是现在也可能有一个解决方案。它不是很优雅,只有当外部流中的元素数量和限制足够小时,它才可能可行。但它将在这些限制下发挥作用。

首先,让我对您的示例进行一点修改,将外部flatMap转换为两个操作(一个map和一个具有标识的flatMap,只进行扁平化(:

Stream.of("a", "b", "c")
.map(s -> Stream.of("x", "y")
.flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.flatMap(s -> s)
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);

我们可以很容易地看到,每个内部流中不需要超过20个元素。因此,我们可以将每个流限制为这个数量的元素。这将起作用(您应该使用可变或常数作为限制(:

Stream.of("a", "b", "c")
.map(s -> Stream.of("x", "y")
.flatMap(sd -> IntStream.rangeClosed(0, Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.flatMap(s -> s.limit(20))            // limit each inner stream
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);

当然,这仍然会产生太多的中间结果,但在上述限制下,这可能不是什么大问题。

最新更新