我有一个需求,在一个方法调用中,我想要访问三个不同的集合来检索数据。现在我正在一个接一个地获取数据。相反,我想创建三个线程,其中每个线程应该并行执行。然后在每件事完成后,我想合并结果并将其发送给客户。
public List<String> getUserData() {
// all the tasks should execute in parallel & finally
// merge all the result & send to the client.
task1();
task2();
task3();
}
有谁能帮我解决这个问题吗? 向所有线程传递相同的CyclicBarrier
// you have 3 threads,
CyclicBarrier meetingPoint=new CyclicBarrier(3);
// assigns their "barrier" variable to cyclic barrier
YourThreadGenerator thread1=new YourThreadGenerator(meetingPoint);
YourThreadGenerator thread2=new YourThreadGenerator(meetingPoint);
YourThreadGenerator thread3=new YourThreadGenerator(meetingPoint);
,然后在三个线程中的每个线程中,
this.barrier.await();
这确保所有线程等待,直到所有其他线程(有相同的循环屏障指令)到达这一行,然后程序流继续为他们,他们可以重复这个过程。
你甚至不需要显式同步。但是这对于许多线程来说很慢。如果你有几个线程,这是可以的。
为方便起见,您可以将主线程算作第4个线程并调用
meetingPoint.await();
。但是它需要用一个额外的线程作为
实例化 CyclicBarrier meetingPoint=new CyclicBarrier(4);
ExecutorService
为通用并行处理提供了健壮的高级支持。在这里,我假设你的任务是由某个返回记录的服务(svc
)提供的不同方法(task1()
, task2()
, & help;),结果是这些结果的并集。
void getUserData1()
throws InterruptedException, ExecutionException, CancellationException
{
List<Callable<List<String>>> tasks = Arrays.asList(svc::task1, svc::task2, svc::task3);
ExecutorService workers = Executors.newFixedThreadPool(tasks.size());
List<Future<List<String>>> tickets;
try {
tickets = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
}
finally {
workers.shutdown();
}
List<String> results = new ArrayList<>();
for (Future<List<String>> ticket : tickets)
results.addAll(ticket.get());
}
异常需要处理;这里,我指定了一个10秒的超时。如果一些结果在这段时间内没有准备好,则抛出CancellationException
。如果另一个线程通过中断这个线程来取消这个请求,则抛出InterruptedException
;如果一个任务失败,则抛出ExecutionException
。在所有这些情况下,您可能会以相同的方式响应客户端,使用"内部错误"消息。
在本例中,每个请求都创建和销毁ExecutorService
,这是昂贵的。在具有良好定义的生命周期的应用程序中,该生命周期定义了何时可以创建和销毁服务,因此可以为所有请求重用服务。在这种情况下,您可能需要一个缓存线程池,而不是一个固定的线程池。
也可以使用较新的ForkJoinPool
,或者构建在fork-join上的并行Stream
功能,但是它只适用于较窄的任务集。如果任务不阻塞或抛出检查异常,Stream
是简洁的。如果任务很耗时,但可以很容易地递归地细分为更小的任务,那么ForkJoinPool
可能值得使用。下面是一个使用Stream
:
List<String> getUserData2()
{
Stream<Supplier<List<String>>> s = Stream.of(svc::task1, svc::task2, svc::task3);
return s.parallel().map(Supplier::get).flatMap(List::stream).collect(Collectors.toList());
}