如何(全局地)替换Java并行流的公共线程池后端



我想全局替换Java并行流默认使用的公共线程池,例如

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});

我知道可以通过向专用线程池提交这样的指令来使用专用的ForkJoinPool(参见Java 8并行流中的自定义线程池)。问题是

  • 是否有可能用其他实现(例如Executors.newFixedThreadPool(10))取代常见的ForkJoinPool ?
  • 是否有可能通过一些全局设置来实现,例如,一些JVM属性?

注释:我喜欢替换F/J池的原因是,因为它似乎有一个错误,使它无法用于嵌套并行循环。

嵌套并行循环性能差,可能导致死锁,参见http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

下面的代码会导致死锁:
// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {
    // (omitted:) do some heavy work here (consuming majority of time)
    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});

(即使没有任何额外的代码在"do work here",考虑到并行设置为<12) .

我的问题是如何替换FJP。如果您喜欢讨论嵌套并行循环,您可能会检查嵌套Java 8并行forEach循环执行差。这种行为是意料之中的吗?.

我认为这不是流API的使用方式。似乎您(错误地)将其用于简单地执行并行任务(关注任务,而不是数据),而不是进行并行流处理(关注流中的数据)。您的代码在某种程度上违反了流的一些主要原则。(我写的是"不知怎么的",因为它不是真的被禁止,而是不鼓励):避免状态和副作用。

除此之外(或者可能是因为副作用),您在外循环中使用了大量同步,这是除了无害之外的一切!

虽然文档中没有提到,但并行流在内部使用通用的ForkJoinPool。无论这是否是缺乏文件,我们必须简单地接受这一事实。ForkJoinTask的JavaDoc状态:

可以定义和使用可能阻塞的ForkJoinTasks,但是这样做需要三个进一步的考虑:(1)如果有任何其他任务的完成应该依赖于阻塞外部同步或I/O的任务。从未连接的事件式异步任务(例如,那些子类化了CountedCompleter的任务)通常属于这一类。(2)为了尽量减少对资源的影响,任务应该小;理想情况下,只执行(可能的)阻塞动作。(3)除非ForkJoinPool。使用ManagedBlocker API,或者已知可能阻塞的任务数量小于池的ForkJoinPool。getParallelism级别,池不能保证有足够的线程可用来确保进程或良好的性能。

再一次,你似乎在使用流来代替简单的for循环和executor服务。

  • 如果您只想并行执行n任务,请使用ExecutionService
  • 如果你有一个更复杂的例子,其中任务创建子任务,考虑使用ForkJoinPool(与ForkJoinTasks)代替。(它确保线程数量恒定,而不会因为太多任务等待其他任务完成而导致死锁的危险,因为等待任务不会阻塞其正在执行的线程)。
  • 如果你想处理数据(并行),考虑使用流API。
  • 您不能"安装"自定义公共池。它是在私有静态代码中内部创建的。但是你可以使用某些系统属性来影响并行性,线程工厂和公共池的异常处理程序(参见JavaDoc of ForkJoinPool)

不要混淆ExecutionServiceForkJoinPool。他们(通常)不是彼此的替代品!

尽管你最初的问题已经得到了很好的回答这个bug(您希望交换FJP实现的原因)似乎在java 1.8.0.40中得到了修复,这对您来说可能很重要。参见嵌套Java 8并行forEach循环执行差。这种行为是意料之中的吗?

相关内容

  • 没有找到相关文章

最新更新