芹菜工人在当前任务完成后不会接新任务



我有三项任务:

@app.task(name='timey')
def timey():
    print "timey"
    while True:
        pass
    return 1
@app.task(name='endtimey')
def endtimey():
    for i in range(10):
        print "ENDTIMEY", time()
        sleep(3)
    return 1
@app.task(name='nexttask')
def nexttask(n):
    print "NEXT TASK"
    return 1

如果我唯一做的就是把endtimey和nexttask连在一起——

chain(endtimey.s() | nexttask.s()).delay()

一切如预期。我在芹菜日志中看到ENDTIMEY <current time>打印了十次,然后是NEXT TASK。然而,如果我用无限任务timey填充7个工作者,然后将endtimeynexttask链接在一起-

for i in range(7):
    timey.s().delay()
chain(endtimey.s() | nexttask.s()).delay()

所有timey任务将由8个工作人员中的7个工作人员完成,endtimey将在第8个工作人上运行其进程,之后日志将显示已收到nexttask,但nexttask将不会运行。

为什么会这样?

此外,如果我终止了芹菜服务器,然后重新启动它,nexttask将是第一个运行的东西。

这是一个人为的例子,但我在一个更复杂的情况下遇到了一个问题,芹菜工人在完成当前任务后就不会接排队的任务。如果我在这种情况下重新启动芹菜,免费的工人将再次开始承担任务。

听起来问题似乎是芹菜的默认预取行为。每个工作进程将在当前处于最大容量时提前保留一定数量的任务,这被称为预取乘数。

它这样做的原因是,当你有大量的短任务时,如果任务已经被预取并准备立即执行,你的总体吞吐量会高得多。

问题是,当你有很多长时间运行的任务或长时间和短时间的混合任务时,即使其他工作人员可以处理,任务也会被繁忙的工作人员保留和阻塞

因此,在您的情况下,您可能需要将预取乘数降低到1。

相关内容

  • 没有找到相关文章

最新更新