Java completableFuture:如何并行化几个CompletableFutures



这里是我的代码:

final int startIndex = LoaderConstants.ServeiTerritorial.DEFAULT_START_INDEX; 
final long pageSize = LoaderConstants.ServeiTerritorial.DEFAULT_PAGE_SIZE;
final String tableName = LoaderConstants.ServeiTerritorial.TIPUS_VIA_TABLE_NAME;
final String ownerType = LoaderConstants.ServeiTerritorial.TIPUS_VIA_OWNER_TYPE;
LongStream
.iterate(1, n -> n <= 1000 / pageSize, n -> n+1)
.mapToObj(pageNumber -> this.buildCompletableFutureOfResultSetType(tableName, ownerType, pageNumber * pageSize, pageSize))
.map(CompletableFuture::join)

我试图得到的是将每个项目并行化。

首先,对于每个页面,我构建一个CompletableFuture:

/**
* Builds a {@link CompletableFuture} in order to get oid.
*/
private CompletableFuture<ResultSetType> buildCompletableFutureOfResultSetType(
final String tableName,
final String owner,
final long pageNumber,
final long pageSize
) {
Supplier<ResultSetType> supplier = () -> this.serveiTerritorialCatalegsClientRepository.getCataleg(tableName, null, owner, null, null, null, pageNumber, pageSize);
return CompletableFuture.supplyAsync(supplier, this.servidorTerminologicExecutor);
}

当我意识到它是按顺序执行的时,我以为它运行得很好。这些是我的日志:

2021-07-21 12:46:35.894 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:35.895 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13801</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.896 DEBUG [hes-mpi-imdg-loader,9ffb4627802548bf,9ffb4627802548bf,false] 22536 --- [ool-1-thread-38] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13801</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.985 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:35.986 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13901</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:35.987 DEBUG [hes-mpi-imdg-loader,501c638f07534a0d,501c638f07534a0d,false] 22536 --- [ool-1-thread-39] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>13901</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.057 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:36.058 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14001</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.061 DEBUG [hes-mpi-imdg-loader,d840cc01f137b810,d840cc01f137b810,false] 22536 --- [ool-1-thread-40] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14001</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.141 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Preparant crida a servei-territorial.getOid (oid: 2.16.724.4.402)
2021-07-21 12:46:36.142 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Creant petició cap a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14101</startIndex><pageSize>100</pageSize></request>)
2021-07-21 12:46:36.142 DEBUG [hes-mpi-imdg-loader,9ec5d6687eb6e9aa,9ec5d6687eb6e9aa,false] 22536 --- [ool-1-thread-41] ServeiTerritorialOidClientRepositoryImpl : Enviant petició a servei-territorial (request: <request><oid>2.16.724.4.402</oid><startIndex>14101</startIndex><pageSize>100</pageSize></request>)

我不太明白我做错了什么。

有什么想法吗?

您在中间步骤中使用CompletableFutre.join调用运行此顺序流。

问题是:元素一次遍历一个(顺序的(流,并遍历每个步骤。当然,这包括join()调用。这意味着,对于每个元素,必须在下一个元素进入处理之前完成整个操作(包括"异步"部分this.serveiTerritorialCatalegsClientRepository.getCataleg(。

要解决此问题,请强制管道在开始调用join()调用之前为所有元素创建期货。像这样的东西应该起作用:

List<CompletableFuture<ResultSetType>> submittedTasks = LongStream
.iterate(1, n -> n <= 1000 / pageSize, n -> n+1)
.mapToObj(pageNumber -> this.buildCompletableFutureOfResultSetType(tableName, 
ownerType, pageNumber * pageSize,
pageSize))
.collect(Collectors.toList());

这样,终端collect()将在没有阻塞的情况下收集所有提交的CompletableFuture对象。您不需要收集到列表中,但任何将强制提交异步任务的终端操作都应该这样做;只要您能够分别对每个任务调用join即可。

之后,您可以阻塞,因为您知道所有异步任务都已启动或至少已排队。

submittedTasks.stream()
.map(CompletableFuture::join)
.forEach(...) //some terminal operation

这样可以避免不必要的阻塞代码。


我注意到我提到了";顺序的";有几次,但这并不意味着在并行流上运行相同的代码就能解决问题。主要问题将持续存在,尽管吞吐量可能会提高,因为你会在中并行阻塞

最新更新