在芹菜中执行顺序任务



我有一些"繁重"的数据库请求,我将使用芹菜执行。考虑到它们很"重",我想按顺序执行它们(一个接一个)。一种可能的解决方案是在命令行中将--concurrency=1指定给Celery。这是有效的。但是有一个问题:当任务正在执行时,所有以下请求返回None:

from celery.task.control import inspect
# Inspect all nodes.
i = inspect()
print(i.scheduled()) # None
print(i.active()) # None
print(i.reserved()) # None
print(i.registered()) # None

同样,运行celery inspect ping返回Error: No nodes replied within time constraint.,因此我无法接收到有关芹菜队列状态的任何信息。

有我的测试python模块:

celeryconfig.py

#BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'amqp://'
#CELERY_RESULT_BACKEND = "redis"
CELERY_RESULT_BACKEND = "amqp://"
# for php
CELERY_TASK_RESULT_EXPIRES = None
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACKS_LATE = True

tasks.py

from celery import Celery
from time import sleep
app = Celery('hello')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
    sleep(30)       
    return x + y

client.py

from tasks import add
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
所以,问题是,如何一个一个地运行任务,并且能够检查队列的状态?

芹菜的检查是通过将查询广播到任何侦听的对象,然后收集响应来执行的。任何在超时(我认为默认是1秒)内没有响应的worker都将被忽略。就好像它不存在一样。

使用--concurrency=1应该不是问题。我刚试过了,在这里很好用。即使并发性为1,芹菜工作线程通常也会有一个额外的执行线程用于通信。(我说"通常",因为我确信有办法配置芹菜射击自己的脚。我所说的与默认值保持一致。)当我尝试--concurrency=1时,实际上每个worker有两个线程。因此,即使工作线程忙于计算任务,也应该有一个线程能够响应广播。

也就是说,如果机器负载很重,那么工人可能需要很长时间才能做出响应。我解决这个问题的方法是重试像i.scheduled()这样的电话,直到我得到每个人的答案。在我的项目中,我知道有多少工人应该启动并运行,所以我有一个列表,我可以用来知道是否每个人都有响应。

相关内容

  • 没有找到相关文章

最新更新