Akka 未来线程永远不会发布



我正在尝试并行执行一些繁重的计算。为此,我使用Akka(Java)。

我没有定义参与者,而是按照文档的建议将我的计算包装到未来中。

计算是并行正确执行的,但问题是为Future<>打开的线程永远不会关闭,在某些时候我遇到错误:OutOfMemory: unable to create new native thread

我的代码结构如下所示:

public void compute(){
    for(Attribute attribute : attributes){
        computeAttribute(attribute);
    }
}
private void computeAttribute(Attribute attribute){
    ActorSystem system = ActorSystem.create("System");
    int nb = findNumberOfIterations(attribute);
    List<Future<AttributeResult>> answers = new ArrayList<>();
    // Computation to be performed in parallel
    for (int i = 0; i < nb; i++) {
        // MasterCaller contains the heavy computation logic
        Future<AttributeResult> f = Futures.future(new MasterCaller(attribute, i), system.dispatcher());
        answers.add(f);
    }
    Future<Iterable<AttributeResult>> futures = Futures.sequence(answers, system.dispatcher());
    Future<AttributeResult> futureTotal = futures.map(new MasterMapper(), system.dispatcher());
    try {
        // Additional processing step after all resulted have been computed
        AttributeResult value = Await.result(futureTotal, Duration.create(1, TimeUnit.SECONDS));
        postProcess(value, attribute);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

假设每次调用 computeAttribute() 时都会创建大约 10 个线程。然后,如果attributes列表中有 1000 个元素,代码将继续创建 10 000 个活动线程!

我非常感谢在这方面得到一些帮助,因为这个问题使 Akka 的使用和并行计算变得不可能。

问题是你为每次调用computeAttribute创建一个新的ActorSystem,并且你永远不会关闭它。由于您在 Akka 中不使用任何东西,因此我建议您的 computeAttribute -方法 将scala.concurrent.ExecutionContext作为参数,然后您可以通过 ExecutionContext.fromExecutorServiceExecutionContext.fromExecutor 传入您自己创建的参数(您仍然需要在适当的时间点关闭它们)或传入ExecutionContext.global()

最新更新