>想象一下,有一个长时间运行的任务,其中包含一组特定的参数和kwarg。在开始新任务之前,是否有机会使用相同的参数/kwarg 撤销所有正在运行和挂起的任务,因为我只对上次添加的任务的结果感兴趣。(基础数据在两次调用之间更改)
我尝试迭代inspect.active()
、inspect.registered()
和inspect.scheduled()
的结果,以获得所有任务,然后filter/revoke
那些有问题的参数和kwargs的任务。
但这并不可靠,因为检查所有工人和搜索任务花费的时间太长。
谁能让我朝着正确的方向前进?
如果检查工作线程的速度很慢,最好通过辅助数据存储进行检查,例如像 redis 这样的键值存储。您为函数添加"锁",因此您知道您已经启动了它。
排队任务时:
- 检查您的商店中是否已存在
- 是:引发异常
- 否:将相关任务信息作为项目添加到您的商店。
- 将任务添加到芹菜任务队列
工作人员将执行它
- 执行它
- 从存储中删除
有一个基于 Redis 的现成实现:
celery-once
(永久链接到当前提交)。
您必须将其指定为任务的base
from celery import Celery
from celery_once import QueueOnce
@celery.task(base=QueueOnce)
def sum(a, b):
...
您还可以指定要考虑的参数以及更多设置,请阅读自述文件以获取更多详细信息。