现在我有一些同步作业,它们是有状态的,所以如果任务失败,我必须取消确认消息,然后让它们转到RabbitMQ
的前面。但是当我尝试引发错误时,我发现芹菜仍然承认此消息,并且队列已被清除。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
raise ValueError
我发现芹菜任务有一个名为retry
的方法,但它会将任务添加到队列的后面。这不是我想要的。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15)
即使我也不能用杀戮信号做到这一点:
os.kill(os.getpid(), signal.SIGKILL)
我该怎么办?芹菜是否提供了一些错误,以便我可以引发此错误以通知芹菜不确认我的消息?
根据文档,芹菜能够 RabbitMQ 优先级队列。 https://docs.celeryproject.org/en/latest/faq.html#does-celery-support-task-priorities
因此,您应该能够通过将重试的任务的优先级高于常规任务来将它们推到队列的前面。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15, priority=9)
或
根据此 github 问题,您还可以将重试的任务分配给新的专用队列,并分配资源以优先处理此队列。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15, queue='prioritized_queue_name')
在文档 https://docs.celeryproject.org/en/stable/userguide/configuration.html 上,我发现:
默认情况下,task_acks_on_failure_or_timeout
enabled
.
所以我认为你应该尝试组合
task_acks_late=True
+task_acks_on_failure_or_timeout=False
实现NO acknowledgement when a task fails
。