我正在尝试并行执行一些繁重的计算。为此,我使用Akka(Java)。
我没有定义参与者,而是按照文档的建议将我的计算包装到未来中。
计算是并行正确执行的,但问题是为Future<>
打开的线程永远不会关闭,在某些时候我遇到错误:OutOfMemory: unable to create new native thread
我的代码结构如下所示:
public void compute(){
for(Attribute attribute : attributes){
computeAttribute(attribute);
}
}
private void computeAttribute(Attribute attribute){
ActorSystem system = ActorSystem.create("System");
int nb = findNumberOfIterations(attribute);
List<Future<AttributeResult>> answers = new ArrayList<>();
// Computation to be performed in parallel
for (int i = 0; i < nb; i++) {
// MasterCaller contains the heavy computation logic
Future<AttributeResult> f = Futures.future(new MasterCaller(attribute, i), system.dispatcher());
answers.add(f);
}
Future<Iterable<AttributeResult>> futures = Futures.sequence(answers, system.dispatcher());
Future<AttributeResult> futureTotal = futures.map(new MasterMapper(), system.dispatcher());
try {
// Additional processing step after all resulted have been computed
AttributeResult value = Await.result(futureTotal, Duration.create(1, TimeUnit.SECONDS));
postProcess(value, attribute);
} catch (Exception e) {
e.printStackTrace();
}
}
假设每次调用 computeAttribute()
时都会创建大约 10 个线程。然后,如果attributes
列表中有 1000 个元素,代码将继续创建 10 000 个活动线程!
我非常感谢在这方面得到一些帮助,因为这个问题使 Akka 的使用和并行计算变得不可能。
问题是你为每次调用computeAttribute
创建一个新的ActorSystem,并且你永远不会关闭它。由于您在 Akka 中不使用任何东西,因此我建议您的 computeAttribute
-方法 将scala.concurrent.ExecutionContext
作为参数,然后您可以通过 ExecutionContext.fromExecutorService
或 ExecutionContext.fromExecutor
传入您自己创建的参数(您仍然需要在适当的时间点关闭它们)或传入ExecutionContext.global()
。