使用.stream().parallel()
的最佳实践是什么?
例如,如果您有一堆阻塞 I/O 调用,并且想要检查是否.anyMatch(...)
,则并行执行此操作似乎是明智之举。
示例代码:
public boolean hasAnyRecentReference(JobId jobid) {
<...>
return pendingJobReferences.stream()
.parallel()
.anyMatch(pendingRef -> {
JobReference readReference = pendingRef.sync();
Duration referenceAge = timeService.timeSince(readReference.creationTime());
return referenceAge.lessThan(maxReferenceAge)
});
}
乍一看,这看起来很明智,因为我们可以同时执行多个阻塞读取,因为我们只关心任何匹配的读取,而不是一个接一个地检查(因此,如果每次读取需要 50 毫秒,我们只需要等待(50 毫秒*预期数量非最近引用)/numThreads)。
在生产环境中引入此代码是否会对代码库的其他部分产生任何不可预见的性能影响?
编辑:正如@edharned指出的那样.parallel()
现在使用CountedCompleter
而不是调用.join()
,这有它自己的问题,Ed 在What is currently being done?
部分的 http://coopsoft.com/ar/Calamity2Article.html 中也解释了这个问题。
我相信下面的信息对于理解为什么分叉连接框架很棘手以及建议.parallel()
结论中的替代方案仍然相关仍然有用。
虽然代码的精神是正确的,但实际代码可能会对所有使用.parallel()
的代码产生系统范围的影响,即使这根本不明显。
不久前,我发现了一篇文章,建议不要这样做:https://dzone.com/articles/think-twice-using-java-8,但直到最近我才深入挖掘。
这些是我做一堆阅读后的想法:
- Java 中的
.parallel()
使用ForkJoinPool.commonPool()
,这是一个由所有流共享的单例ForkJoinPool
(ForkJoinPool.commonPool()
是一种公共静态方法,所以理论上其他库/部分代码可以使用它) -
工作ForkJoinPool
实现了工作窃取,除了共享队列之外,还具有每线程队列- 窃取意味着当线程空闲时,它会寻找更多的工作要做
- 最初我想:根据这个定义,
cached
线程池不是也做工作窃取吗(即使一些参考称其为缓存线程池的工作共享)? 事实证明,使用空闲一词时似乎有一些术语模糊:
- 在
cached
线程池中,线程只有在完成其任务后才处于空闲状态。如果它在等待阻止调用时被阻止,它不会变为空闲状态 在
forkjoin
线程池中,线程在完成其任务或对子任务调用.join()
方法(这是一种特殊的阻塞调用)时处于空闲状态。在子任务上调用
.join()
时,线程在等待该子任务完成时变为空闲状态。空闲时,它将尝试执行任何其他可用任务,即使它在另一个线程的队列中(它会窃取工作)。[这是重要的一点]一旦找到另一个要执行的任务,它必须在恢复其原始执行之前完成它,即使它正在等待的子任务在线程仍在执行被盗任务时完成。
[这也很重要]此工作窃取行为仅适用于调用
.join()
的线程。如果线程在其他内容(如 I/O)上被阻塞,它将变为空闲状态(即它不会窃取工作)。
- 在
-
Java 流不允许你提供定制的 ForkJoinPool,但 https://github.com/amaembo/streamex
我花了一段时间才理解2.3.2
的含义,所以我将举一个简单的例子来帮助说明这个问题:
注意:这些是虚拟示例,但是通过使用流,您可能会进入等效的情况而没有意识到这一点,这些流在内部执行分叉连接的东西。
此外,我将使用极其简化的伪代码,这些代码仅用于说明 .parallel() 问题,但不一定有意义。
假设我们正在实现合并排序
merge_sort(list):
left, right = split(list)
leftTask = mergeSortTask(left).fork()
rightTask = mergeSortTaks(right).fork()
return merge(leftTask.join(), rightTask.join())
现在假设我们有另一段代码执行以下操作:
dummy_collect_results(queriesIds):
pending_results = []
for id in queriesIds:
pending_results += longBlockingIOTask(id).fork()
// do more stuff
这里发生了什么?
当您编写合并排序代码时,您认为排序调用不执行任何 I/O,因此它们的性能应该非常确定,对吗?
右。您可能意想不到的是,由于dummy_collect_results
方法创建了一堆长时间运行和阻塞的子任务,当执行合并排序任务的线程在.join()
上阻塞,等待子任务完成时,它们可能会开始执行其中一个长阻塞子任务。
这很糟糕,因为如上所述,一旦长时间阻塞(在 I/O 上,而不是.join()
调用,因此线程不会再次空闲)被盗,它都必须完成,无论线程等待的子任务是否.join()
在阻塞 I/O 时完成。
这使得合并排序任务的执行不再是确定性的,因为执行这些任务的线程最终可能会窃取完全位于其他地方的代码生成的 I/O 密集型任务。
这也非常可怕且难以捕获,因为您可能在整个代码库中使用.parallel()
而没有任何问题,并且只需要一个类在使用.parallel()
时引入长时间运行的任务,突然之间,代码库的所有其他部分可能会获得不一致的性能。
所以我的结论是:
- 从理论上讲,如果你能保证在代码中任何地方创建的所有任务都很短,那么
.parallel()
很好。 .parallel()
可能会对系统范围的性能产生影响,除非您知道(例如,如果您稍后添加一段使用.parallel()
且具有长任务的代码,则可能会影响所有使用.parallel()
的代码的性能)- 由于
2.
你最好完全避免.parallel()
,要么使用ExecutorCompletionService
,要么使用 https://github.com/amaembo/streamex,它允许你提供自己的ForkJoinPool
(这允许更多的隔离)。更好的是,您可以使用 https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,它使您可以更精细地控制并发机制。