如何在工作线程关闭时向正在运行的任务发送消息?



我有一个可能需要几个小时才能完成的任务。 但是,可以通过在 memcached 中设置标志来发出信号以尽快干净退出。 任务会定期轮询标志,找到该标志时,任务会保存其状态并干净地退出。 如果我只是在缓存中手动设置标志,这一切都可以正常工作。

我想使用它来及时关闭工人,我只需要知道在哪里放置标志设置逻辑。 这是我尝试过的:

worker_shutdown处理程序

我尝试将其放入worker_shutdown处理程序中,但看起来只有在所有任务完成后才会调用它,因此它无助于关闭我长时间运行的任务。

通过celery.platforms.signals的信号处理程序

我尝试使用celery.platforms.signals接口附加 TERM 和 INT 信号处理程序,但我的处理程序从未被调用:

@worker_init.connect
def handle_worker_init(*args, **kwargs):
platforms.signals["TERM"] = handle_worker_shutdown
platforms.signals["INT"] = handle_worker_shutdown

def handle_worker_shutdown(*args, **kwargs):
logger.info("Setting database cleanup stop flag to save our ship")
database_cleanup_stop()

我也尝试在worker_process_init中执行此操作,但我的处理程序仍未被调用。(即使它被调用也会有点令人讨厌,因为每个工作进程大概都会设置标志,但我只需要设置一次。 无害,但令人讨厌。


是否有其他信号可以设置我的"提前退出"标志? 或者解决这个问题的任何其他建议?

使用worker_shutting_down信号:

from celery.signals import worker_shutting_down
@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, ** kwargs):
print(f 'worker_shutting_down({sig}, {how}, {exitcode})')

在工作人员开始关机流程时调度。像魅力一样工作...

相关内容

  • 没有找到相关文章

最新更新