是否可以确定是否存在具有特定任务id的任务?当我试图获得状态时,我总是挂起。
>>> AsyncResult('...').status
'PENDING'
我想知道给定的任务id是否是一个真正的芹菜任务id,而不是一个随机字符串。我想要不同的结果,这取决于是否有针对某个id的有效任务。
过去可能有一个具有相同id的有效任务,但结果可能已从后端删除。
Celery在发送任务时不写状态,这在一定程度上是一种优化(请参阅文档)。
如果你真的需要它,很容易添加:
from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish
# when using celery versions older than 4.0, use body instead of headers
@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
# the task may not exist if sent using `send_task` which
# sends tasks by name, so fall back to the default result backend
# if that is the case.
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend
backend.store_result(headers['id'], None, "SENT")
然后,您可以测试PENDING状态,以检测任务是否(似乎)没有已发送:
>>> result.state != "PENDING"
AsyncResult.state在任务ID未知的情况下返回PENDING。
待决
任务正在等待执行或未知。任何不是的任务id已知被暗示处于挂起状态。
http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending
如果您需要区分未知ID和现有ID,可以提供自定义任务ID:
>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
...
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
...
现在我使用以下方案:
- 获取任务id
- 设置为memcache键,如"task_%s"%task.id消息"已启动"
- 将任务id传递给客户端
- 现在,我可以从客户端监视任务状态(从任务消息设置到memcache)
- 从任务上的就绪-设置为memcache密钥消息"就绪"
- 从客户端启动任务就绪-启动一个特殊任务,该任务将从memcache中删除密钥并执行必要的清理操作
您需要在创建的AsyncTask对象上调用.get()
才能从后端实际获取结果。
请参阅Celery常见问题解答。
进一步澄清我的回答。
从技术上讲,任何字符串都是有效的ID,无法验证任务ID。查明任务是否存在的唯一方法是询问后端是否知道,为此必须使用.get()
。
这引入了当后端没有任何关于您提供的任务ID的信息时.get()
会阻塞的问题,这是为了允许您启动任务,然后等待其完成。
在最初的问题中,我将假设OP想要获得先前完成的任务的状态。要做到这一点,你可以通过一个非常小的超时并捕捉超时错误:
from celery.exceptions import TimeoutError
try:
# fetch the result from the backend
# your backend must be fast enough to return
# results within 100ms (0.1 seconds)
result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
result = None
if result:
print "Result exists; state=%s" % (result.state,)
else:
print "Result does not exist"
不用说,只有当后端存储结果时,这才有效,如果不是,就无法知道任务ID是否有效,因为没有任何东西记录它们。
更进一步的澄清。
您想要做的事情无法使用AMQP后端完成,因为它不存储结果,而是转发结果。
我的建议是切换到数据库后端,这样结果就在一个数据库中,您可以在现有的芹菜模块之外进行查询。如果结果数据库中不存在任何任务,则可以认为ID无效。
所以我有了这个想法:
import project.celery_tasks as tasks
def task_exist(task_id):
found = False
# tasks is my imported task module from celery
# it is located under /project/project, where the settings.py file is located
i = tasks.app.control.inspect()
s = i.scheduled()
for e in s:
if task_id in s[e]:
found = True
break
a = i.active()
if not found:
for e in a:
if task_id in a[e]:
found = True
break
r = i.reserved()
if not found:
for e in r:
if task_id in r[e]:
found = True
break
# if checking the status returns pending, yet we found it in any queues... it means it exists...
# if it returns pending, yet we didn't find it on any of the queues... it doesn't exist
return found
根据https://docs.celeryproject.org/en/stable/userguide/monitoring.html不同类型的队列检查有:忙碌的调度,保留,撤销,已注册,统计数据,query_task,
所以随意挑选吧。
也许有一种更好的方法来检查队列中的任务,但目前这对我来说应该有效。
也许使用redis-direct是一个很好的解决方案。
pool = redis.ConnectionPool(host=config.REDIS_HOST,
port=config.REDIS_PORT,
db=config.REDIS_DB,
password=config.REDIS_PASSWORD)
redis_client = Redis(connection_pool=pool)
def check_task_exist(id):
for one in redis_client.lrange('celery', 0, -1):
task_info = json.loads(one.decode())
if task_info['headers']['id'] == id:
return True
return False
我找到了一种检查方法,它对我有效:
def check_task_exists(task_id):
inspector = app.control.inspect()
active_tasks = inspector.active()
# Check active tasks
if active_tasks:
for worker, tasks in active_tasks.items():
for task in tasks:
if task['id'] == task_id:
return True
# Check scheduled tasks
scheduled_tasks = inspector.scheduled()
if scheduled_tasks:
for worker, tasks in scheduled_tasks.items():
if task_id in tasks:
return True
# Check reserved tasks
reserved_tasks = inspector.reserved()
if reserved_tasks:
for worker, tasks in reserved_tasks.items():
if task_id in tasks:
return True
# Task not found
return False
尝试
AsyncResult('blubb').state
这可能奏效。
它应该返回一些不同的东西。
如果我错了,请纠正我。
if built_in_status_check(task_id) == 'pending'
if registry_exists(task_id) == true
print 'Pending'
else
print 'Task does not exist'