我有一个场景,需要将任务的状态更新为自定义值,然后读取它并根据该值应用一些逻辑。
这是我的芹菜配置:
celery = Celery(app.import_name,
backend='redis://127.0.0.1:6379/0',
broker='redis://127.0.0.1:6379/0')
celery.conf.update(CELERY_TASK_SERIALIZER='pickle',
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0',
CELERY_IGNORE_RESULT=False,
CELERY_ALWAYS_EAGER=False,
CELERY_ACCEPT_CONTENT=['pickle'],
CELERY_RESULT_SERIALIZER='pickle')
所以我基本上是用redis+pickle。更新状态的调用与类似
self.update_state('foo')
但是当我这样调用我的任务并检查状态时:
result = task.delay(*args)
print(result.state)
我总是得到PENDING
或SUCCESS
,所以任何中间值都会被跳过,即使更新的调用肯定是
如果我检查结果后端类型,我会得到Redis,并且ignore_result
选项设置为False,所以我在这里找到的建议修复都不起作用。
如果我们看看update_state
方法的签名,它看起来像这个
def update_state(self, task_id=None, state=None, meta=None):
"""Update task state.
:keyword task_id: Id of the task to update, defaults to the
id of the current task
:keyword state: New state (:class:`str`).
:keyword meta: State metadata (:class:`dict`).
当我们运行时
self.update_state('foo')
尝试将task_ id为CCD_ 5的任务更新为状态CCD_。
相反,我们应该尝试
self.update_state(state='foo')
其将当前任务状态更新为CCD_ 7。