使用CompletableFuture运行Java循环



我正在尝试使用CompletableFuture并行执行for循环。在循环中,我使用supplyAsync调用doSomething来获得输出字符串,然后将其放入HashMap:中

...
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
CompletableFuture<?> completableFuture = null;
for ( int i = 0; i < numberOfRecords; i++ ) {
final int finalI = i;
completableFuture = CompletableFuture
.supplyAsync( () -> doSomething( data, finalI ) )
.thenAccept( str -> map.put( finalI, str ) );
}
completableFuture.join();
private String doSomething(HashMap<String, String> data, int finalI ) ) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
for ( int k = 0; k < data.size(); k++ ) {
//process data and add it in queue
}
String result = processQueueAndReturnString(queue);
return result;

问题是,当for循环几乎完成时(当i接近numberOfRecords时(,doSomething方法中的另一个for循环会跳过一些迭代,例如,如果k=5,它只能运行循环直到k=2 or 3,在这种情况下,supplyAsync( () -> doSomething( data, finalI ) )返回null。因此,我的for循环和CompletableFuture循环似乎完成了,直到一些迭代完全完成。

关于如何解决这个问题,有什么建议或提示吗?

因此,我的for循环和CompletableFuture循环似乎在完成某些迭代之前就完成了。

示例代码中的每个循环迭代都会创建一个CompletableFuture。如果您想等待所有工作完成,您需要加入所有,而不仅仅是上一次迭代创建的工作。

类似这样的东西(风格纠正!(:

ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
CompletableFuture<Void>[] futures = new CompletableFuture<Void>[nosRecords];
for (int i = 0; i < nosRecords; i++) {
final int finalI = i;
futures[i] = CompletableFuture
.supplyAsync(() -> doSomething(data, finalI))
.thenAccept(str -> map.put(finalI, str));
}
CompletableFuture.allOf(futures);

请注意,您需要将CompletableFuture<?>更改为CompletableFuture<Void>,因为allOf()(javadoc(的声明需要这样做。幸运的是,thenAccept(...)调用已经返回了一个CompletableFuture<Void>


HashMapdata不是线程安全的,应该是吗?我只是在方法doSomething中使用它来获得基于索引finalI的条目值。我不处理那个HashMap。我刚读过。

supplyAsync调用和对其lambda参数的调用之间,之前将发生。因此,如果data在执行任何doSomething调用期间不发生更改,它们都将在data映射中看到正确的值。

假设事情如您所说(并保持这种状态(,那么可以在那里使用非同步的HashMap

Stephen C的Answer看起来是正确的,适用于今天的Java。但在未来(啊,看看我在那里做了什么?(,Java可能会提供一种更简单、更快的方法,使用虚拟线程。

Project Loom

Project Loom即将推出Java,现在可以在早期访问Java 16的基础上进行初步构建。

一个主要功能是虚拟线程(光纤(。这些是轻量级线程。当任何虚拟线程中的控制流阻塞时,Java会检测到该阻塞并在另一个虚拟线程中切换,以保持CPU核心繁忙。这可以大大加快频繁阻塞的线程代码(与视频编码等严格限制CPU的任务相反(。

请注意,根据Project Loom的工作人员之一Ron Pressler的说法,对CompletableFuture上许多方法中的大多数方法的需求随着虚拟线程而消失。您可能只需要拨打get。请参阅他的演讲,最近一次是2020-11-11、2020-09-17和2020-07-28。

虽然我没有捕捉到您业务逻辑的所有细微差别,但我想我已经掌握了要点。与Stephen C类似,我收集了所有返回的CompletableFuture对象。然后我检查它们,看看它们是否成功完成。

在Project Loom中,ExecutorService现在是AutoCloseable。因此,我们可以使用try with resources语法。尝试块的末尾将被阻止,直到所有提交的任务都完成为止。这种自然阻塞取代了Stephen C.在解决方案中看到的CompletableFuture.allOf(futures);

示例代码

这是我们任务的一个类,一个返回UUID对象的Callable。我们还将每项任务睡眠一秒钟,以展示一项冗长的任务。我们的任务还将其结果记录在我们传递给其构造函数的ConcurrentMap中。

package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class DoSomething implements Callable < UUID >
{
private Integer identifier;
private ConcurrentMap < Integer, UUID > results;
// Constructor
public DoSomething ( Integer identifier , ConcurrentMap < Integer, UUID > resultsMap )
{
this.identifier = identifier;
this.results = resultsMap;
}
@Override
public UUID call ( ) throws Exception
{
Thread.sleep( Duration.ofSeconds( 1 ) );
UUID uuid = UUID.randomUUID();
this.results.put( this.identifier , uuid );
return uuid;
}
}

以下是用于实例化和运行这些任务的代码。

public static void main ( String[] args )
{
System.out.println( "INFO - Java version: " + Runtime.version() );
System.out.println( "INFO - Host OS: " + System.getProperty( "os.name" ) + " version " + System.getProperty( "os.version" ) );
System.out.println( "INFO - arch: " + System.getProperty( "os.arch" ) + " | Available processors (cores): " + Runtime.getRuntime().availableProcessors() );
long maxMemory = Runtime.getRuntime().maxMemory();
System.out.println( "INFO - Maximum memory (bytes): " + String.format( Locale.getDefault() , "%,d" , ( maxMemory == Long.MAX_VALUE ? "no limit" : maxMemory ) ) );
System.out.println( "----------------------------------------------" );
long start = System.nanoTime();
ConcurrentMap < Integer, UUID > results = new ConcurrentSkipListMap <>();
int countTasks = 1_000_000;
System.out.println( "INFO - Starting a run of " + countTasks + ". " + Instant.now() );
List < CompletableFuture < UUID > > futures = new ArrayList <>( countTasks );
try (
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
for ( int nthTask = 0 ; nthTask < countTasks ; nthTask++ )
{
executorService.submit( new DoSomething( nthTask , results ) );
}
}
// At this point, flow-of-control blocks until all submitted tasks finish (are done, or are cancelled).
List < CompletableFuture < UUID > > canceled = new ArrayList <>();
List < CompletableFuture < UUID > > completedExceptionally = new ArrayList <>();
for ( CompletableFuture < UUID > future : futures )
{
if ( future.isCancelled() )
{
canceled.add( future );
} else if ( future.isCompletedExceptionally() )
{
completedExceptionally.add( future );
} else if ( ! future.isDone() )
{
throw new IllegalStateException( "All tasks should be done at this point, normally or interrupted." );
} else
{
throw new IllegalStateException( "Should not be able to reach this point." );
}
}
Duration duration = Duration.ofNanos( System.nanoTime() - start );
System.out.println( "Done at " + Instant.now() + ". Took: " + duration );
System.out.println( "Problems… canceled size: " + canceled.size() + " | completedExceptionally size: " + completedExceptionally.size() );
System.out.println( "Results size = " + String.format( Locale.getDefault() , "%,d" , results.size() ) );
}
INFO - Java version: 16-loom+9-316
INFO - Host OS: Mac OS X version 10.14.6 
INFO - arch: x86_64 | Available processors (cores): 6
INFO - Maximum memory (bytes): 8,589,934,592
----------------------------------------------
INFO - Starting a run of 10000000. 2021-01-01T05:40:28.564019Z
Done at 2021-01-01T05:41:11.567852Z. Took: PT43.006895236S
Problems… canceled size: 0 | completedExceptionally size: 0
Results size = 10,000,000

运行一百万个这样的任务需要几秒钟的时间。跑一千万不到一分钟。

因此,您可以看到睡眠一秒钟的阻塞线程显然不会占用内核上的时间。如果他们在核心上花费时间,我们将等待很长一段时间:10000000个任务*每个任务1秒/6个核心=1666666秒=462小时。

最新更新