如何使芹菜任务调用异步任务



我有一个Django应用程序,它需要运行优化算法。该算法由两部分组成。第一部分是进化算法,该算法调用了第二部分的一定数量的任务,第二部分是模拟退火算法。问题是芹菜不允许任务调用异步任务。我试过下面的代码:

            sa_list = []
            for cromossomo in self._populacao:
                sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))
            job = group(sa_list)
            result = job.apply_async()
            resultados = result.get()

这段代码是进化算法的一部分,进化算法是一项芹菜任务。当我尝试运行它时,芹菜显示以下信息:

〔2015-12-02 16:20:15970:警告/Worker-1〕/home/arthur/django-user/local/lib/python2.7/site-packages/celece/result.py:45:RuntimeWarning:永远不要在任务中调用result.get()!看见http://docs.celeryq.org/en/latest/userguide/tasks.html#task-同步子任务

在Celery 3.2中,这将导致一个异常而不仅仅是一个警告。

尽管只是一个警告,芹菜似乎充满了任务和锁。

我找了很多解决方案,但都不起作用。

处理这一问题的一种方法是有一个两阶段的管道:

def first_task():
    sa_list = []
    for cromossomo in self._populacao:
        sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))
    job = group(sa_list)
    result = job.apply_async()
    result.save()
    return result.id

然后这样称呼它:

from path.to.tasks import app, first_task
result_1 = first_task.apply_async()
result_2_id = result_1.get()
result_2 = app.GroupResult.restore(result_2_id)
resultados = result_2.get()

还有其他方法需要做更多的工作——你可以用和弦来收集小组的结果。

问题不是芹菜不允许执行您的示例中的异步任务,而是您将遇到死锁,因此发出警告:

假设您有一个任务a,它通过apply_async()派生出许多子任务B。这些任务中的每一项都由一名工人执行。问题是,如果任务B的数量大于可用工作者的数量,则任务A仍在等待他们的结果(至少在您的示例中,这不是默认情况)。当任务A仍在运行时,已执行任务B的工人将不执行另一个任务,他们将被阻止,直到任务A完成。(我不知道确切的原因,但就在几周前我遇到了这个问题。)

这意味着,在手动关闭工作程序之前,芹菜无法执行任何操作。

解决方案

这完全取决于你将如何处理你的任务结果。如果您需要它们来执行下一个子任务,您可以通过使用回调进行链接或将其硬编码到相应的任务中来链接它们(这样您就可以调用第一个任务,调用第二个任务,依此类推)。

如果你只需要看看它们是否被执行,是否成功,你可以使用flower来监控你的任务。

如果需要进一步处理所有子任务的输出,我建议将结果写入xml文件:让任务A调用所有任务B,完成后执行处理结果的任务C。也许还有更优雅的解决方案,但这无疑避免了僵局。

最新更新