我需要在芹菜任务中实现以下逻辑:如果满足某些条件,请关闭当前工作线程并重试任务。
在示例任务上测试:
@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
try:
raise Exception('test exection')
except Exception as exc:
print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
app.control.shutdown(destination=[self.request.hostname]) # send shutdown signal to the current worker
raise self.retry(exc=exc, countdown=5)
print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
return 'some result'
但它给出了奇怪的结果,步骤:
- 运行辅助角色:
celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1
。 - 将任务推送到"test_queue"队列。
- 工人抓住了它并关闭了。我在 RabbitMQ 的"test_queue"中打开了任务列表,看到:
- 发布者提交的原始任务,重试次数 = 0(来自 app.control.shutdown(( 调用(;
- 原始任务的副本(具有相同的 id(,重试次数 = 1(来自 self.retry(( 调用(。
- 然后我启动另一个工人到同一个队列,它捕获了任务并关闭了。但在 Broker 上,原始任务的另一个副本被推送到具有相同 id 的队列中,重试次数 = 1。因此,我的队列中有 3 个任务。所有接下来的工作线程都为队列提供了 + 1 个新任务。在这种情况下,条件 max_retries = 1 不起作用。
我尝试过的:
- 在 celeryconfig.py 中设置
task_reject_on_worker_lost = True
并运行相同的任务。结果:没有任何变化。 - 在工作人员的任务中仅保留关机呼叫。结果:每次尝试时仅推送原始任务(没有重复任务(,但不计算重试次数(始终设置为 0(;
- 在工作线程中关闭和重试调用之前添加
app.control.revoke(self.request.id)
(基于此(。结果:第一次尝试后得到相同的结果(队列中有 2 个任务(,但是当我运行第二个工作线程队列时,刷新了它没有运行任何内容。因此,任务丢失且不会重试。
有没有办法在呼叫期间不将原始任务推回队列app.control.shutdown()
?看来,这才是根本原因。或者您能否建议另一种解决方法,以实现上述正确逻辑。
设置:RabbitMQ 3.8.2,芹菜 4.1.0,python 3.5.4
celeryconfig.py 中的设置:
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True
看起来问题task_acks_late
在您的配置文件中。通过使用它,您说"仅在我完成运行后从队列中删除任务"。然后你杀死了工作人员,所以它永远不会被确认(并且你会得到任务的副本(。