这个问题是基于这个问题的答案 Stream.of和IntStream.range有什么区别?
由于IntStream.range
生成已排序的流,因此以下代码的输出只会生成0
的输出:
IntStream.range(0, 4)
.peek(System.out::println)
.sorted()
.findFirst();
此外,分离器将具有SORTED
特性。下面的代码返回true
:
System.out.println(
IntStream.range(0, 4)
.spliterator()
.hasCharacteristics(Spliterator.SORTED)
);
现在,如果我在第一个代码中引入一个parallel()
,那么正如预期的那样,输出将包含从0
到3
的所有 4 个数字,但以随机顺序,因为流不再排序由于parallel()
。
IntStream.range(0, 4)
.parallel()
.peek(System.out::println)
.sorted()
.findFirst();
这将产生如下内容:(以任何随机顺序)
2
0
1
3
因此,我希望SORTED
属性已因parallel()
而被删除。但是,下面的代码也返回true
。
System.out.println(
IntStream.range(0, 4)
.parallel()
.spliterator()
.hasCharacteristics(Spliterator.SORTED)
);
为什么parallel()
不改变SORTED
属性?既然所有四个数字都是打印的,Java如何意识到即使SORTED
属性仍然存在,流也没有排序?
具体如何做到这一点在很大程度上是一个实现细节。您必须深入挖掘源代码才能真正了解原因。基本上,并行和顺序管道的处理方式不同。查看AbstractPipeline.evaluate
,它检查isParallel()
,然后根据管道是否并行执行不同操作。
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
如果您随后查看SortedOps.OfInt
,您将看到它覆盖了两种方法:
@Override
public Sink<Integer> opWrapSink(int flags, Sink sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.SORTED.isKnown(flags))
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedIntSortingSink(sink);
else
return new IntSortingSink(sink);
}
@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
}
else {
Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
int[] content = n.asPrimitiveArray();
Arrays.parallelSort(content);
return Nodes.node(content);
}
}
如果是顺序管道,最终将调用opWrapSink
,当它是并行流时,将调用opEvaluateParallel
(顾名思义)。请注意,如果管道已排序(只是返回不变),opWrapSink
不会对给定接收器执行任何操作,但opEvaluateParallel
始终评估拆分器。
另请注意,并行性和排序性并不相互排斥。您可以拥有具有这些特征的任意组合的流。
"排序"是Spliterator
的特征。从技术上讲,这不是Stream
的特征(就像"并行"一样)。当然,parallel
可以使用具有全新特征的全新拆分器(从原始拆分器获取元素)创建一个流,但是当您可以重用相同的拆分器时,为什么要这样做呢?想象一下,在任何情况下,您都必须以不同的方式处理并行和顺序流。
您需要退后一步,考虑如何解决此类问题,考虑到ForkJoinPool
用于并行流并且它基于工作窃取工作。如果您也知道Spliterator
的工作原理,那将非常有帮助。一些细节在这里。
你有一个特定的 Stream,你把它"拆分"(非常简化)成更小的部分,并将所有这些部分交给一个ForkJoinPool
执行。所有这些部分都是由单个线程独立处理的。由于我们在这里谈论的是线程,显然没有事件的顺序,事情是随机发生的(这就是为什么你看到一个随机的顺序输出)。
如果您的流保留了订单,则终端操作也应该保留它。因此,当中间操作以任何顺序执行时,您的终端操作(如果到该点的流是有序的)将以有序的方式处理元素。稍微简化一下:
System.out.println(
IntStream.of(1,2,3)
.parallel()
.map(x -> {System.out.println(x * 2); return x * 2;})
.boxed()
.collect(Collectors.toList()));
map
将以未知的顺序处理元素(请记住,ForkJoinPool
和线程),但collect
将按"从左到右">的顺序接收元素。
现在,如果我们将其推断为您的示例:当您调用parallel
时,流被分成小块并进行处理。例如,看看这是如何拆分的(一次)。
Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
.parallel()
.boxed()
.sorted()
.spliterator()
.trySplit(); // trySplit is invoked internally on parallel
spliterator.forEachRemaining(System.out::println);
在我的机器上,它可以打印1,2,3,4
.这意味着内部实现将流拆分为两个Spliterator
:left
和right
。left
有[1, 2, 3, 4]
,权利有[5, 6, 7, 8]
.但事实并非如此,这些Spliterator
可以进一步拆分。例如:
Spliterator<Integer> spliterator =
IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
.parallel()
.boxed()
.sorted()
.spliterator()
.trySplit()
.trySplit()
.trySplit();
spliterator.forEachRemaining(System.out::println);
如果你尝试再次调用trySplit
,你会得到一个null
——意思是,就是这样,我不能再分裂了。
因此,您的流:IntStream.range(0, 4)
将被拆分为4个拆分器。所有这些都通过线程单独处理。如果您的第一个线程知道它当前工作的这个Spliterator
是"最左边的线程",就是这样!其余线程甚至不需要开始工作 - 结果是已知的。
另一方面,可能是这个具有"最左边"元素的Spliterator
只是最后开始的。因此,前三个可能已经完成了它们的工作(因此在您的示例中调用了peek
),但它们不会"产生"所需的结果。
事实上,这就是内部的做法。您不需要了解代码 - 但流和方法名称应该很明显。