Java并行流在使用newCachedThreadPool()时没有使用最佳线程数



我对数据库的并行读取做了两个独立的实现。第一个实现是使用ExecutorServicenewCachedThreadPool()构造函数和期货:我只是做一个调用,为每个读取情况返回一个未来,然后在我做所有的调用之后,我调用get()。这个实现工作得很好,而且足够快。

第二个实现是使用并行流。当我将并行流调用放入相同的ExecutorService池时,它的工作速度几乎慢了5倍,似乎它没有使用我希望的那么多线程。当我把它放到ForkJoinPool pool = new ForkJoinPool(50)中,它的工作速度和以前的实现一样快。

我的问题是

为什么并行流在newCachedThreadPool版本中没有充分利用线程?

这是第二个实现的代码(我没有发布第一个实现,因为一个工作正常):

private static final ExecutorService pool = Executors.newCachedThreadPool();
final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream =
                personIdList.stream().flatMap(
                        personId -> movieIdList.stream().map(
                                movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList());
final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> {
      final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
            inputPair -> {
                   return FeedbackDao.find(inputPair.getKey(), inputPair.getValue());
            }).filter(Objects::nonNull);
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId));
});

这与ForkJoinTask。如果当前线程来自ForkJoinPool,它将使用相同的池来提交新任务,如果不是,它将使用与本地机器处理器总数相同的公共池,当您使用Executors.newCachedThreadPool()创建池时,该池创建的线程不会被识别为来自ForkJoinPool,因此它使用公共池。

下面是它是如何实现的,它应该可以帮助你更好地理解:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

Executors.newCachedThreadPool()池创建的线程不会是ForkJoinWorkerThread类型,因此它将使用未优化池大小的公共池来提交新任务。

相关内容

  • 没有找到相关文章

最新更新