芹菜在django数据库执行前撤销任务



出于并发性原因,我使用Django数据库而不是RabbitMQ。

但是我无法解决在任务执行之前撤消任务的问题。

我找到了一些关于这件事的答案,但它们似乎不完整,或者我得不到足够的帮助。

  • 第一次回答
  • 第二回答

我如何使用一个模型扩展芹菜任务表,添加一个布尔字段(撤销)设置当我不想要的任务执行?

谢谢。

由于芹菜通过ID跟踪任务,因此您真正需要的是能够判断哪些ID已被取消。不需要修改kombu的内部结构,您可以创建自己的表(或memcached等)来跟踪已取消的ID,然后检查当前可取消任务的ID是否在其中。

这是支持远程revoke命令的传输内部所做的:

所有工作节点都保留一个被撤销任务id的内存,或者在内存中或在磁盘上持久化(参见持久化撤销)。(from Celery docs)

当你使用django传输时,你要自己负责。在这种情况下,由每个任务来检查它是否已被取消。

因此,任务的基本形式(添加日志以代替实际操作)变成:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)
@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

可以扩展&可以通过各种方式改进它,例如创建一个CancelableTask基类,就像您链接到的其他答案之一一样,但这是基本表单。您现在缺少的是模型和检查模型的功能。

注意,在这种情况下,ID将是一个字符串ID,如a5644f08-7d30-43ff-a61e-81c165ad9e19而不是一个整数。您的模型可以像这样简单:

from django.db import models
class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)
def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)
def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

你现在可以通过观察你的芹菜服务的调试日志来检查行为,同时做以下事情:

my_task.delay()
models.cancel_task(my_task.delay())

相关内容

  • 没有找到相关文章

最新更新