通过定期调用"join()"来避免僵尸进程



我正在用Python编写一个程序,该程序可以永久运行并随机接收请求 必须并行处理。每个请求可能需要数十分钟 处理并给 CPU 带来一些负担,因此 asyncio 不是一种选择。为 每个请求我都会启动一个新的工作进程。

问题是,如果我在完成后不打电话给join()工人, 它变成了一个僵尸过程。

我当前的解决方案是定期迭代所有工作进程并调用 如果它们完成,join()它们。有没有比使用multiprocessing.Queue.get()超时?也许是一种事件驱动的方法?还是在这种情况下使用超时完全没问题? 请参阅以下代码为我的 当前解决方案。

#!/usr/bin/env python3
import multiprocessing as mp
import queue
import random
import time
from typing import List

def main():
q = mp.Queue()
p_produce = mp.Process(target=produce, args=(q,))
p_receive = mp.Process(target=receive, args=(q,))
p_produce.start()
p_receive.start()
p_receive.join()
p_produce.join()

def produce(q: mp.Queue):
for i in range(10):
print(f"put({i})")
q.put(str(i))
time.sleep(random.uniform(2.0, 3.0))
q.put("EOF")

def receive(q: mp.Queue):
workers = []  # type: List[mp.Process]
while True:
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
try:
request = q.get(block=True, timeout=1)  # Is there a better way?
except queue.Empty:
continue
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()

def worker(name: str):
print(f"Working on {name}")
time.sleep(random.uniform(2.0, 3.0))

if __name__ == "__main__":
main()

正如@Giannis评论中所建议的那样,您正在从头开始重新发明流程管理器。 坚持Python附带的内容,你对使用multiprocessing.Pool有一些异议吗? 如果是这样,怎么办?

执行此操作的常用方法是选择要同时运行的最大工作进程数。 说

NUM_WORKERS = 4

然后将其作为receive()函数的替代品:

def receive(q: mp.Queue):
pool = mp.Pool(NUM_WORKERS)
while True:
request = q.get()
if request == "EOF":
break
pool.apply_async(worker, args=(request,))
pool.close()
pool.join()

NUM_WORKERS流程创建一次,并跨任务重复使用。 如果由于某种原因您需要(或想要)每个任务的全新流程,则只需向Pool构造函数添加maxtasksperchild=1

如果出于某种原因你需要知道每个任务何时完成,你可以,例如,在apply_async()调用中添加一个callback=参数,并编写一个小函数,该函数将在任务结束时调用(它将作为参数接收,无论你的worker()函数返回什么)。

恶魔在恶魔中

因此,事实证明,您的实际应用程序中的工作进程希望(出于任何原因)创建自己的流程,而Pool创建的进程无法做到这一点。 它们被创建为"守护程序"进程。 从文档中:

当进程退出时,它会尝试终止其所有守护程序子进程。

请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时终止,则该守护进程将使其子进程成为孤立进程。

像泥一样清澈;-) 这里有一种精心设计的方法,可以创建非守护进程,但对我的口味来说太复杂了,Pool工作原理:

Python 进程池非守护进程?

回到你已经知道有效的原始设计,我只是改变它,将定期加入工作进程的逻辑与操作队列的逻辑分开。 从逻辑上讲,它们真的彼此无关。 具体来说,创建一个"后台线程"来加入对我来说很有意义:

def reap(workers, quit):
from time import sleep
while not quit.is_set():
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
sleep(2)  # whatever you like
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()
def receive(q: mp.Queue):
import threading
workers = []  # type: List[mp.Process]
quit = threading.Event()
reaper = threading.Thread(target=reap, args=(workers, quit))
reaper.start()

while True:
request = q.get()
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)
quit.set()
reaper.join()

我碰巧知道list.append()list.remove()在 CPython 中是线程安全的,因此无需使用锁来保护这些操作。 但是如果你添加一个,它不会有什么坏处。

还有一个尝试

虽然Pool创建的进程是守护进程,但类似concurrent.futures.ProcessPoolExecutor创建的进程似乎不是。 因此,我的第一个建议的这个简单变体可能对您有用(也可能不起作用;-)):

NUM_WORKERS = 4
def receive(q: mp.Queue):
import concurrent.futures as cf
with cf.ProcessPoolExecutor(NUM_WORKERS) as e:
while True:
request = q.get()
if request == "EOF":
break
e.submit(worker, request)

如果这对你有用,很难想象有什么比这更简单的了。

嗯,一种解决方案是使用像python rq或selery这样的工作队列。从本质上讲,您将生成n个worker(单独的进程),这些工作线程将查看要执行的任务的队列,然后在主程序上,您只需将任务推送到队列中并定期检查结果。

相关内容

  • 没有找到相关文章