如何在Celery worker中获取当前任务的吊销标志



我想在Celery+RabbitMQ broker上通过清理过程实现任务取消。我如何在Celery worker中获得当前任务的"REVOKED"状态?

# tasks.py -- celery worker
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
  for i in range(0, 10):
    time.sleep(1)
    # I want check here for cleanup.
  return x + y
# caller.py
from tasks import add
result = add.delay(4, 4)
result.revoke()

Celery支持可中止的任务,但它只适用于数据库后端。

Python 3.4.1/Celery 3.1.17/RabbitMQ 3.4.4

Felippe Da Motta Raposo的建议在我的自定义任务中起作用:

from celery import Task
from celery.task.control import inspect
WORKER_NAME = "celery@server"
inspector = inspect([WORKER_NAME])
class CustomTask(Task):
    def _is_revoked(self):
        revoked_list = inspector.revoked()
        return (revoked_list and self.task_id in revoked_list[WORKER_ADDRESS]
    def run(self, *args, **kwargs):
        self.task_id = self.request.id

查看scheduled_tasks,您可以询问芹菜是否计划运行您的任务。

例如:

    import celery
    celery_inspect = celery.current_app.control.inspect()
    celery_inspect.registered_tasks()

此方法返回一个dict,其中包含按worker调度的所有celen任务。

相关内容

  • 没有找到相关文章

最新更新