芹菜异步task_method的结果



我正在尝试获取任务的状态如下:

__init__.py

celery = Celery(app.name,backend='amqp',broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

foo.py

class Foo(object):  
    def bar(self):
        task = self._bar_async.apply_async()
        return task.id
    @celery.task(filter=task_method,bind=True)
    def _bar_async(task,self):
        for i in range(0,100):
            task.update_state(state='PROGRESS',meta={'progress':i})
            time.sleep(2)

taskstatus.py

def taskstatus(task_id):
    task = celery.AsyncResult(id=task_id)

这是将update_state与绑定一起使用的推荐方法吗?
此外,当我尝试使用 taskstatus 获取任务的状态时,我总是得到任务NoneType。问题出在哪里?

代码中有两个问题

首先,在方法中添加参数 selfapply_async

def bar(self):
        task = self._bar_async.apply_async([self])

此更改将修复获取任务的无类型问题。原因是任务将在辅助角色中失败,因此您无法获得结果。

其次,应该使用 taskstatus() 中的 app.backend.get_result 而不是 AsyncResult 来查看进度,因为 AsyncResult.get() 会阻塞,直到任务状态准备就绪。

from apps import celery
app = celery.app
r = app.backend.get_result(task_id)
print r

相关内容

  • 没有找到相关文章

最新更新