我想全局替换Java并行流默认使用的公共线程池,例如
IntStream.range(0,100).parallel().forEach(i -> {
doWork();
});
我知道可以通过向专用线程池提交这样的指令来使用专用的ForkJoinPool(参见Java 8并行流中的自定义线程池)。问题是
- 是否有可能用其他实现(例如
Executors.newFixedThreadPool(10)
)取代常见的ForkJoinPool ? - 是否有可能通过一些全局设置来实现,例如,一些JVM属性?
注释:我喜欢替换F/J池的原因是,因为它似乎有一个错误,使它无法用于嵌套并行循环。
嵌套并行循环性能差,可能导致死锁,参见http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html
下面的代码会导致死锁:// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {
// (omitted:) do some heavy work here (consuming majority of time)
// Need to synchronize for a small "subtask" (e.g. updating a result)
synchronized(this) {
// Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
IntStream.range(0,100).parallel().forEach(j -> {
// do work here
});
}
});
(即使没有任何额外的代码在"do work here",考虑到并行设置为<12) .
我的问题是如何替换FJP。如果您喜欢讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行差。这种行为是意料之中的吗?.
我认为这不是流API的使用方式。似乎您(错误地)将其用于简单地执行并行任务(关注任务,而不是数据),而不是进行并行流处理(关注流中的数据)。您的代码在某种程度上违反了流的一些主要原则。(我写的是"不知怎么的",因为它不是真的被禁止,而是不鼓励):避免状态和副作用。
除此之外(或者可能是因为副作用),您在外循环中使用了大量同步,这是除了无害之外的一切!
虽然文档中没有提到,但并行流在内部使用通用的ForkJoinPool
。无论这是否是缺乏文件,我们必须简单地接受这一事实。ForkJoinTask
的JavaDoc状态:
可以定义和使用可能阻塞的ForkJoinTasks,但是这样做需要三个进一步的考虑:(1)如果有任何其他任务的完成应该依赖于阻塞外部同步或I/O的任务。从未连接的事件式异步任务(例如,那些子类化了CountedCompleter的任务)通常属于这一类。(2)为了尽量减少对资源的影响,任务应该小;理想情况下,只执行(可能的)阻塞动作。(3)除非ForkJoinPool。使用ManagedBlocker API,或者已知可能阻塞的任务数量小于池的ForkJoinPool。getParallelism级别,池不能保证有足够的线程可用来确保进程或良好的性能。
再一次,你似乎在使用流来代替简单的for循环和executor服务。
- 如果您只想并行执行
n
任务,请使用ExecutionService
- 如果你有一个更复杂的例子,其中任务创建子任务,考虑使用
ForkJoinPool
(与ForkJoinTasks
)代替。(它确保线程数量恒定,而不会因为太多任务等待其他任务完成而导致死锁的危险,因为等待任务不会阻塞其正在执行的线程)。 - 如果你想处理数据(并行),考虑使用流API。
- 您不能"安装"自定义公共池。它是在私有静态代码中内部创建的。但是你可以使用某些系统属性来影响并行性,线程工厂和公共池的异常处理程序(参见JavaDoc of ForkJoinPool)
不要混淆ExecutionService
和ForkJoinPool
。他们(通常)不是彼此的替代品!
尽管你最初的问题已经得到了很好的回答这个bug(您希望交换FJP实现的原因)似乎在java 1.8.0.40中得到了修复,这对您来说可能很重要。参见嵌套Java 8并行forEach循环执行差。这种行为是意料之中的吗?