Stream.parallel() 不会更新拆分器的特性吗?



这个问题是基于这个问题的答案 Stream.of和IntStream.range有什么区别?

由于IntStream.range生成已排序的流,因此以下代码的输出只会生成0的输出:

IntStream.range(0, 4)
.peek(System.out::println)
.sorted()
.findFirst();

此外,分离器将具有SORTED特性。下面的代码返回true

System.out.println(
IntStream.range(0, 4)
.spliterator()
.hasCharacteristics(Spliterator.SORTED)
);

现在,如果我在第一个代码中引入一个parallel(),那么正如预期的那样,输出将包含从03的所有 4 个数字,但以随机顺序,因为流不再排序由于parallel()

IntStream.range(0, 4)
.parallel()
.peek(System.out::println)
.sorted()
.findFirst();

这将产生如下内容:(以任何随机顺序)

2
0
1
3

因此,我希望SORTED属性已因parallel()而被删除。但是,下面的代码也返回true

System.out.println(
IntStream.range(0, 4)
.parallel()
.spliterator()
.hasCharacteristics(Spliterator.SORTED)
);

为什么parallel()不改变SORTED属性?既然所有四个数字都是打印的,Java如何意识到即使SORTED属性仍然存在,流也没有排序?

具体如何做到这一点在很大程度上是一个实现细节。您必须深入挖掘源代码才能真正了解原因。基本上,并行和顺序管道的处理方式不同。查看AbstractPipeline.evaluate,它检查isParallel(),然后根据管道是否并行执行不同操作。

return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

如果您随后查看SortedOps.OfInt,您将看到它覆盖了两种方法:

@Override
public Sink<Integer> opWrapSink(int flags, Sink sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedIntSortingSink(sink);
else
return new IntSortingSink(sink);
}
@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
}
else {
Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
int[] content = n.asPrimitiveArray();
Arrays.parallelSort(content);
return Nodes.node(content);
}
}

如果是顺序管道,最终将调用opWrapSink,当它是并行流时,将调用opEvaluateParallel(顾名思义)。请注意,如果管道已排序(只是返回不变),opWrapSink不会对给定接收器执行任何操作,但opEvaluateParallel始终评估拆分器。

另请注意,并行性和排序性并不相互排斥。您可以拥有具有这些特征的任意组合的流。

"排序"是Spliterator的特征。从技术上讲,这不是Stream的特征(就像"并行"一样)。当然,parallel可以使用具有全新特征的全新拆分器(从原始拆分器获取元素)创建一个流,但是当您可以重用相同的拆分器时,为什么要这样做呢?想象一下,在任何情况下,您都必须以不同的方式处理并行和顺序流。

您需要退后一步,考虑如何解决此类问题,考虑到ForkJoinPool用于并行流并且它基于工作窃取工作。如果您也知道Spliterator的工作原理,那将非常有帮助。一些细节在这里。

你有一个特定的 Stream,你把它"拆分"(非常简化)成更小的部分,并将所有这些部分交给一个ForkJoinPool执行。所有这些部分都是由单个线程独立处理的。由于我们在这里谈论的是线程,显然没有事件的顺序,事情是随机发生的(这就是为什么你看到一个随机的顺序输出)。

如果您的流保留了订单,则终端操作也应该保留它。因此,当中间操作以任何顺序执行时,您的终端操作(如果到该点的流是有序的)将以有序的方式处理元素。稍微简化一下:

System.out.println(
IntStream.of(1,2,3)
.parallel()
.map(x -> {System.out.println(x * 2); return x * 2;})
.boxed()
.collect(Collectors.toList()));

map将以未知的顺序处理元素(请记住,ForkJoinPool和线程),但collect将按"从左到右">的顺序接收元素。


现在,如果我们将其推断为您的示例:当您调用parallel时,流被分成小块并进行处理。例如,看看这是如何拆分的(一次)。

Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
.parallel()
.boxed()
.sorted()
.spliterator()
.trySplit(); // trySplit is invoked internally on parallel
spliterator.forEachRemaining(System.out::println);

在我的机器上,它可以打印1,2,3,4.这意味着内部实现将流拆分为两个Spliteratorleftrightleft[1, 2, 3, 4],权利有[5, 6, 7, 8].但事实并非如此,这些Spliterator可以进一步拆分。例如:

Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
.parallel()
.boxed()
.sorted()
.spliterator()
.trySplit()
.trySplit()
.trySplit();
spliterator.forEachRemaining(System.out::println);

如果你尝试再次调用trySplit,你会得到一个null——意思是,就是这样,我不能再分裂了。

因此,您的流:IntStream.range(0, 4)将被拆分为4个拆分器。所有这些都通过线程单独处理。如果您的第一个线程知道它当前工作的这个Spliterator是"最左边的线程",就是这样!其余线程甚至不需要开始工作 - 结果是已知的。

另一方面,可能是这个具有"最左边"元素的Spliterator只是最后开始的。因此,前三个可能已经完成了它们的工作(因此在您的示例中调用了peek),但它们不会"产生"所需的结果。

事实上,这就是内部的做法。您不需要了解代码 - 但流和方法名称应该很明显。