Java Thread wait() & notify() with parallel task



我有一个需求,在一个方法调用中,我想要访问三个不同的集合来检索数据。现在我正在一个接一个地获取数据。相反,我想创建三个线程,其中每个线程应该并行执行。然后在每件事完成后,我想合并结果并将其发送给客户。

public List<String> getUserData() {
    // all the tasks should execute in parallel & finally
    // merge all the result & send to the client.
    task1(); 
    task2();
    task3();
}
有谁能帮我解决这个问题吗?

向所有线程传递相同的CyclicBarrier

// you have 3 threads,
CyclicBarrier meetingPoint=new CyclicBarrier(3);
// assigns their "barrier" variable to cyclic barrier
YourThreadGenerator thread1=new YourThreadGenerator(meetingPoint);
YourThreadGenerator thread2=new YourThreadGenerator(meetingPoint);
YourThreadGenerator thread3=new YourThreadGenerator(meetingPoint);

,然后在三个线程中的每个线程中,

this.barrier.await();

这确保所有线程等待,直到所有其他线程(有相同的循环屏障指令)到达这一行,然后程序流继续为他们,他们可以重复这个过程。

你甚至不需要显式同步。但是这对于许多线程来说很慢。如果你有几个线程,这是可以的。

为方便起见,您可以将主线程算作第4个线程并调用

meetingPoint.await();

。但是它需要用一个额外的线程作为

实例化
 CyclicBarrier meetingPoint=new CyclicBarrier(4);

ExecutorService为通用并行处理提供了健壮的高级支持。在这里,我假设你的任务是由某个返回记录的服务(svc)提供的不同方法(task1(), task2(), & help;),结果是这些结果的并集。

void getUserData1()
  throws InterruptedException, ExecutionException, CancellationException
{
  List<Callable<List<String>>> tasks = Arrays.asList(svc::task1, svc::task2, svc::task3);
  ExecutorService workers = Executors.newFixedThreadPool(tasks.size());
  List<Future<List<String>>> tickets;
  try {
    tickets = workers.invokeAll(tasks, 10, TimeUnit.SECONDS);
  }
  finally {
    workers.shutdown();
  }
  List<String> results = new ArrayList<>();
  for (Future<List<String>> ticket : tickets)
    results.addAll(ticket.get());
}

异常需要处理;这里,我指定了一个10秒的超时。如果一些结果在这段时间内没有准备好,则抛出CancellationException。如果另一个线程通过中断这个线程来取消这个请求,则抛出InterruptedException;如果一个任务失败,则抛出ExecutionException。在所有这些情况下,您可能会以相同的方式响应客户端,使用"内部错误"消息。

在本例中,每个请求都创建和销毁ExecutorService,这是昂贵的。在具有良好定义的生命周期的应用程序中,该生命周期定义了何时可以创建和销毁服务,因此可以为所有请求重用服务。在这种情况下,您可能需要一个缓存线程池,而不是一个固定的线程池。

也可以使用较新的ForkJoinPool,或者构建在fork-join上的并行Stream功能,但是它只适用于较窄的任务集。如果任务不阻塞或抛出检查异常,Stream是简洁的。如果任务很耗时,但可以很容易地递归地细分为更小的任务,那么ForkJoinPool可能值得使用。下面是一个使用Stream:

的示例
List<String> getUserData2()
{
  Stream<Supplier<List<String>>> s = Stream.of(svc::task1, svc::task2, svc::task3);
  return s.parallel().map(Supplier::get).flatMap(List::stream).collect(Collectors.toList());
}

最新更新