我有一个相当长的Celery
任务。几分钟以上。
有时,由于各种原因,一名工人被标记为解雇,另一名工人开始工作。如果运行它的机器需要更换,或者正在部署新的代码版本,就会发生这种情况。在这种情况下,工作者会收到SIGTERM信号。
我想知道任务本身是否有可能定期检查这个工作者是否已经收到SIGTERM并正在等待终止,在这种情况下,只需将任务放回队列并终止即可。(然后,任务将在另一个工人身上启动,并将继续执行其工作(
编辑:澄清-是否可以在任务中检查它是否在等待终止的工作人员身上执行。像这样:
# Some long task that can take even a few hours.
def some_task(...):
for i in range(...):
do_some_work()
# That's the missing function:
if did_this_worker_received_SIGTERM_and_waiting_to_be_terminated():
# stop the task in the middle, and it will be executed again later
当Celery工作程序收到SIGTERM时,它将启动热关闭。这意味着它将从所有队列中取消订阅,预取的任务(如果有的话(将返回到它们的队列,工作进程本身将在关闭之前开始等待当前运行的任务完成。如果这是你所害怕的,没有任务会丢失。
所有这些事件都可以处理(请参阅Worker Signals(。
如果你仍然坚持在你的任务中有一些额外的逻辑来处理工人状态,那么也许最简单的解决方案是实现工人关闭处理程序(如我上面提到的文档部分所述(,让它在Redis或其他分布式K/V存储中存储一个标志,重构需要这个标志的任务,让它们访问这个标志并执行您需要它们执行的任何操作。
我能问你为什么要做这样的事吗?是否启用了task_acks_late
?这样,如果一个任务不能按时完成,并且工作人员将停机,则该任务将在新的工作人员身上再次运行。
这是文件。还有task_reject_on_worker_lost,我没有尝试过,但也许它也能帮助你:
将此设置为true将允许消息重新排队,因此任务将由同一个或另一个工作人员再次执行工人