使用Python RQ,我们尝试动态管理工作进程。我们使用一个定制的worker脚本,其(简化形式)如下:
from rq import Connection, Worker
queues_to_listen_on = get_queues_to_listen_on()
with Connection(connection = get_worker_connection()):
w = Worker(queues_to_listen_on)
w.work()
我们对工人的停工特别感兴趣。我们主要关心的是如何在关闭一个worker之前,以一种能够完成当前工作的方式优雅地关闭它。适当的Worker
对象上的request_stop(...)
信号处理程序似乎做了我们需要的,但似乎没有办法(至少据我所知)发射它,除非它是通过在终端中运行的工作进程上按下CTRL+C
。
在我看来,有两种可能的解决方案(肯定有更多)-按偏好顺序:
- 通过编程方式,使用
rq
库,将信号发送到request_stop
,从而触发优雅关机。 - 以某种方式获取正确进程的pid(不确定是主力进程还是工作侦听器进程),并使用其他方法向该进程发送适当的信号。我们有一些方法可以做到这一点,但它很可能需要更多的工作,并引入其他变量的问题,我宁愿被忽略(例如,使用
Fabric
运行远程命令或沿着那些行)。
如果有更好的方法来解决这个问题,或者有不同的替代方案可以达到相同的目标,我将非常感谢您的建议。
选项1在设计方面肯定更好。
然而,为了解决必须使用CTRL + C
退出进程的特定问题(我也讨厌这样),您可以为您的工人使用以下策略:
# WORKER_NAME.py
import os
PID = os.getpid()
@atexit.register
def clean_shut():
print "Clean shut performed"
try:
os.unlink("WORKER_NAME.%d" % PID)
except:
pass
# Worker main
def main():
f = open("WORKER_NAME.%d" % PID, "w")
f.write("Delete this to end WORKER_NAME gracefully")
f.close()
while os.path.exists("WORKER_NAME.%d" % PID):
# Worker working
在您的主脚本中,获得@Borys建议的工作PID,发送温暖停止请求,和os.unlink("path/to/WORKER_NAME.%d" % worker_PID)
以确保优雅关闭:)
这只适用于运行无限循环的工人。如果工作进程调用的东西甚至阻塞了普通的顺序一次性作业,则必须进一步跟踪到可能阻塞的例程以从那里解决,例如应用某种超时策略。