在将Celery与RabbitMQ一起使用时,我遇到了一个非常棘手的问题。
我有这样的东西:
from celery import group
group_of_tasks = group(task.s(x) for x in job_list)
result = group_of_tasks.delay()
print result.id # let's call this d453359d...
上面的操作很好,没有问题,我可以在result.results.中查询组的状态以及单个AsyncResults
但是,如果我尝试在单独的控制台中执行以下操作:
from celery.result import GroupResult
x = GroupResult.restore('d453359d...')
我得到以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:Python27libsite-packagesceleryresult.py", line 806, in restore
).restore_group(id)
File "C:Python27libsite-packagescelerybackendsamqp.py", line 297, in restore_group
'restore_group is not supported by this backend.')
NotImplementedError: restore_group is not supported by this backend.
我在Celery?中从taskset_id检索GroupResult时遇到了类似的问题?,其中提到了.save
,但使用它也会导致抛出NotImplementedError
。
在celery.backends.AMQP中定义的AMQP后端的源具有以下内容:
def save_group(self, group_id, result):
raise NotImplementedError(
'save_group is not supported by this backend.')
def restore_group(self, group_id, cache=True):
raise NotImplementedError(
'restore_group is not supported by this backend.')
所以,我的问题是,难道我没有办法只使用GroupResult
ID来重新创建一组结果吗?唯一的方法是存储组中每个AsyncResults
的ID并查询每个ID?
还是我只是错过了一些显而易见的东西?
我使用RabbitMQ在Windows上运行Python 2.7.10上的Celery。
您是正确的-在使用RabbitMQ时无法重新创建结果组-您需要使用支持此操作的其他结果后端,如Redis。