Java - 将工作拆分为多个线程



我遇到了以下问题:出于性能原因,我需要在多个线程之间拆分工作,但我不确定采取什么方法。

首先,我将提供的任务应该返回一个值并采用一个参数。此外,main 方法(执行主要工作,而不是static main())已经在单独的线程上运行并定期调用。此外,此方法必须在某个时候等待所有线程完成,然后继续。

一种方法(对我来说最明显)是在单独的线程上调度每个作业并将结果存储在类变量中:

public Object result1, result2;
public void mainMethod() throws InterruptedException {
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
result1 = expensiveMethod("param1");
}
});
final Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
result2 = expensiveMethod("param2");
}
});
thread1.join();
thread.join();
//Do rest of work
}
private Object expensiveMethod(Object param){
// Do work and return result
}

这有点丑陋且不理想,因为正如我所说,mainMethod 被调用了很多次,我不希望在设置结果变量时有任何竞争条件。理想情况下,我想使它们成为局部变量,但我无法使它们从 run 方法中访问,除非它们是最终的,然后我无法为它们赋值......

我考虑的另一种方法是:

public void mainMethod() throws InterruptedException, ExecutionException {
String obj1, obj2;
final ExecutorService executorService = Executors.newFixedThreadPool(16);
final Future<String> res1 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return expensiveMethod("param1");
}
});
final Future<String> res2 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return expensiveMethod("param2");
}
});
obj1 = res1.get();
obj2 = res2.get();
}
private String expensiveMethod(String param) {
// Do work and return result
}

这会自动等待来自 main 方法的这两个计算,并允许我将结果存储在本地。你们怎么看?还有其他方法吗?

你的ExecutorService方法几乎是最现代、最安全的方法。建议将Callable提取到单独的类:

public class ExpensiveTask implements Callable<String> {
private final String param;
public ExpensiveTask(String param) {
this.param = param;
}
@Override
public String call() throws Exception {
return expensiveMethod(param);
}
}

这将使您的代码更加干净:

final ExecutorService executorService = Executors.newFixedThreadPool(16);
final Future<String> res1 = executorService.submit(new ExpensiveTask("param1"));
final Future<String> res2 = executorService.submit(new ExpensiveTask("param2"));
String obj1 = res1.get();
String obj2 = res2.get();

一些注意事项:

  • 如果您只想同时处理两个任务,那么 16 个线程太多了 - 或者您可能想从多个客户端线程中重用该池?

  • 记得关闭游泳池

  • 使用轻量级ExecutorCompletionService等待完成的第一个任务,不一定等待提交的第一个任务。

如果您需要完全不同的设计理念,请查看 akka 及其基于参与者的并发模型。

首先,您可能希望从mainMethod()外部化ExecutorService的创建 如果频繁调用它,则可能会创建大量线程。

Future方法更好,因为这正是期货的目的。此外,它使阅读代码变得更加容易。

更轻松地说,尽管您可能必须将对象定义为 final,但您始终可以在对象上使用 setter 方法,无论您的引用是否为 final,都可以调用这些方法,从而可能允许您更改最终对象的值。(参考文献是最终对象不是!

略有不同的方法是:

  • 创建链接阻塞队列

  • 将其传递给每个任务。任务可以是线程,也可以是 j.u.c.Executor 上的 Runnables。

  • 每个任务将其结果添加到队列中

  • 主线程在循环中使用 queue.take() 读取结果

这样,结果在计算后立即处理。

您希望使用完成服务并跟踪提交的任务。
在你的循环中,你 take() 并在完成所有任务后退出循环。
扩展性非常好,您稍后会添加更多任务。

我将添加一个在我看来比为您的参数化Callable创建一个全新的类更优雅的建议。我的解决方案是一种返回Callable实例的方法:

Callable<String> expensive(final String param) {
return new Callable<String>() { public String call() { 
return expensiveMethod(param);
}};
}

这甚至使客户端代码更可口:

final Future<String> f1 = executor.submit(expensive("param1"));
private static final ExecutorService threadpool = Executors.newFixedThreadPool(3);
ArrayList<Future<List<Data>>> futures = new ArrayList<Future<List<Data>>>();
for (ArrayList<Data> data : map.values()) {
final Future<List<Data>> future = threadpool.submit(new ValidationTaskExecutor(data));
futures.add(future);
}
List<List<Data>> res = new ArrayList<List<Data>>();
for (Future<List<Data>> future : futures) {
allValidRequest.addAll(future.get());
}

最新更新