如何正确地动态关闭Python RQ工作进程



使用Python RQ,我们尝试动态管理工作进程。我们使用一个定制的worker脚本,其(简化形式)如下:

from rq import Connection, Worker
queues_to_listen_on = get_queues_to_listen_on()
with Connection(connection = get_worker_connection()):
    w = Worker(queues_to_listen_on)
    w.work()

我们对工人的停工特别感兴趣。我们主要关心的是如何在关闭一个worker之前,以一种能够完成当前工作的方式优雅地关闭它。适当的Worker对象上的request_stop(...)信号处理程序似乎做了我们需要的,但似乎没有办法(至少据我所知)发射它,除非它是通过在终端中运行的工作进程上按下CTRL+C

在我看来,有两种可能的解决方案(肯定有更多)-按偏好顺序:

  1. 通过编程方式,使用rq库,将信号发送到request_stop,从而触发优雅关机。
  2. 以某种方式获取正确进程的pid(不确定是主力进程还是工作侦听器进程),并使用其他方法向该进程发送适当的信号。我们有一些方法可以做到这一点,但它很可能需要更多的工作,并引入其他变量的问题,我宁愿被忽略(例如,使用Fabric运行远程命令或沿着那些行)。

如果有更好的方法来解决这个问题,或者有不同的替代方案可以达到相同的目标,我将非常感谢您的建议。

选项1在设计方面肯定更好。

然而,为了解决必须使用CTRL + C退出进程的特定问题(我也讨厌这样),您可以为您的工人使用以下策略:

# WORKER_NAME.py
import os
PID = os.getpid()
@atexit.register
def clean_shut():
    print "Clean shut performed"
    try:
        os.unlink("WORKER_NAME.%d" % PID)
    except:
        pass
# Worker main
def main():
    f = open("WORKER_NAME.%d" % PID, "w")
    f.write("Delete this to end WORKER_NAME gracefully")
    f.close()
    while os.path.exists("WORKER_NAME.%d" % PID):
        # Worker working

在您的主脚本中,获得@Borys建议的工作PID,发送温暖停止请求,和os.unlink("path/to/WORKER_NAME.%d" % worker_PID)以确保优雅关闭:)

这只适用于运行无限循环的工人。如果工作进程调用的东西甚至阻塞了普通的顺序一次性作业,则必须进一步跟踪到可能阻塞的例程以从那里解决,例如应用某种超时策略。

最新更新