我已经实现了一个基于multiprocessing.Process
和JoinableQueue
的WorkerManager。当我尝试处理进程异常时,比如在proc.join(timeout)之后超时或取消处理异常,并评估proc.exitcode以确定如何处理,然后调用in_queue.task_done()以通知作业已使用异常处理逻辑完成。然而,它需要调用两次。我不知道为什么要叫两次。有人能找出原因吗。
整个代码片段:
# -*- coding=utf-8 -*-
import time
import threading
from queue import Empty
from multiprocessing import Event, Process, JoinableQueue, cpu_count, current_process
TIMEOUT = 3
class WorkersManager(object):
def __init__(self, jobs, processes_num):
self._processes_num = processes_num if processes_num else cpu_count()
self._workers_num = processes_num
self._in_queue, self._run_queue, self._out_queue = JoinableQueue(), JoinableQueue(), JoinableQueue()
self._spawned_procs = []
self._total = 0
self._stop_event = Event()
self._jobs_on_procs = {}
self._wk_kwargs = dict(
in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue,
stop_event=self._stop_event
)
self._in_stream = [j for j in jobs]
self._out_stream = []
self._total = len(self._in_stream)
def run(self):
# Spawn Worker
worker_processes = [
WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
]
self._spawned_procs = [
Process(target=process.run, args=tuple())
for process in worker_processes
]
for p in self._spawned_procs:
p.start()
self._serve()
monitor = threading.Thread(target=self._monitor, args=tuple())
monitor.start()
collector = threading.Thread(target=self._collect, args=tuple())
collector.start()
self._join_workers()
# TODO: Terminiate threads
monitor.join(TIMEOUT)
collector.join(TIMEOUT)
self._in_queue.join()
self._out_queue.join()
return self._out_stream
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
if p.is_alive():
p.terminate()
job = self._jobs_on_procs.get(p.name)
print('Process TIMEOUT: {0} {1}'.format(p.name, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
for _ in range(2):
# NOTE: Call task_done twice
# Guessing:
# 1st time to swtich process?
# 2nd time to notify task has done?
# TODO: figure it out why?
self._in_queue.task_done()
else:
if p.exitcode == 0:
print("{} exit with code:{}".format(p, p.exitcode))
else:
job = self._jobs_on_procs.get(p.name)
if p.exitcode > 0:
print("{} with code:{} {}".format(p, p.exitcode, job))
else:
print("{} been killed with code:{} {}".format(p, p.exitcode, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
for _ in range(2):
# NOTE: Call task_done twice
# Guessing:
# 1st time to swtich process?
# 2nd time to notify task has done?
# TODO: figure it out why?
self._in_queue.task_done()
def _collect(self):
# TODO: Spawn a collector proc
while True:
try:
r = self._out_queue.get()
self._out_stream.append(r)
self._out_queue.task_done()
if len(self._out_stream) >= self._total:
print("Total {} jobs done.".format(len(self._out_stream)))
self._stop_event.set()
break
except Empty:
continue
def _serve(self):
for job in self._in_stream:
self._in_queue.put(job)
for _ in range(self._workers_num):
self._in_queue.put(None)
def _monitor(self):
running = 0
while True:
proc_name, job = self._run_queue.get()
running += 1
self._jobs_on_procs.update({proc_name: job})
self._run_queue.task_done()
if running == self._total:
break
class WorkerProcess(object):
def __init__(self, worker_id, in_queue, run_queue, out_queue, stop_event):
self._worker_id = worker_id
self._in_queue = in_queue
self._run_queue = run_queue
self._out_queue = out_queue
self._stop_event = stop_event
def run(self):
self._work()
print('worker - {} quit'.format(self._worker_id))
def _work(self):
print("worker - {0} start to work".format(self._worker_id))
job = {}
while not self._stop_event.is_set():
try:
job = self._in_queue.get(timeout=.01)
except Empty:
continue
if not job:
self._in_queue.task_done()
break
try:
proc = current_process()
self._run_queue.put((proc.name, job))
r = self._run_job(job)
self._out_queue.put(r)
except Exception as err:
print('Unhandle exception: {0}'.format(err), exc_info=True)
result = {"status": 'failed'}
self._out_queue.put(result)
finally:
self._in_queue.task_done()
def _run_job(self, job):
time.sleep(job)
return {
'status': 'succeed'
}
def main():
jobs = [3, 4, 5, 6, 7]
procs_num = 3
m = WorkersManager(jobs, procs_num)
m.run()
if __name__ == "__main__":
main()
问题代码如下:
self._out_queue.put(result)
for _ in range(2):
# ISSUE HERE !!!
# NOTE: Call task_done twice
# Guessing:
# 1st time to swtich process?
# 2nd time to notify task has done?
# TODO: figure it out why?
self._in_queue.task_done()
我需要调用self._in_queue.task_done()
两次,以通知JoinableQueue异常句柄逻辑已完成作业。
我猜task_done()
第一次调用是否是为了切换进程上下文?或其他任何东西。根据测试。第二个taskdone()生效。
worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 5
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.
如果你调用task_done()一次,它将永远阻塞而无法完成。
问题是您有一个竞赛条件,定义为:
当计算机程序要正常运行,取决于程序进程或线程的顺序或时间时,软件中就会出现竞争条件。
在方法WorkerProcess._work
中,主循环开始:
while not self._stop_event.is_set():
try:
job = self._in_queue.get(timeout=.01)
except Empty:
continue
if not job:
self._in_queue.task_done()
break
CCD_ 6正由CCD_ 7线程设置。根据发生这种情况时WorkerProcess._work
在循环中的位置,它可以退出循环,留下已放置在_in_queue
上的None
,表示不再有作业。显然,对于两个过程,这种情况会发生两次。这种情况甚至可能发生在0、1、2或3个进程中。
解决方法是用while True:
替换while not self._stop_event.is_set():
,并且仅依靠在_in_queue
上找到None
来表示终止。这使您能够为那些正常完成的进程删除对task_done
的额外调用(实际上,每个成功完成的进程只需要一个额外调用,而不是两个)。
但这只是问题的一半。另一半是你的代码:
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
...
p.terminate()
因此,您没有给员工足够的时间来耗尽_in_queue
,因此可能会有任意数量的消息留在上面(当然,在您的示例中,只有当前的"作业"正在处理,None
哨兵总共有2个)。
但这就是代码的普遍问题:它被过度设计了。作为一个示例,请参阅上面的第一个代码片段。它可以进一步简化为:
while True:
job = self._in_queue.get() # blocking get
if not job:
break
此外,甚至没有理由使用JoinableQueue
或Event
实例,因为使用放置在_in_queue
上的None
哨兵就足以表明工作进程应该终止,尤其是在您将提前终止工作进程的情况下。简化的工作代码是:
import time
import threading
from multiprocessing import Process, Queue, cpu_count, current_process
TIMEOUT = 3
class WorkersManager(object):
def __init__(self, jobs, processes_num):
self._processes_num = processes_num if processes_num else cpu_count()
self._workers_num = processes_num
self._in_queue, self._run_queue, self._out_queue = Queue(), Queue(), Queue()
self._spawned_procs = []
self._total = 0
self._jobs_on_procs = {}
self._wk_kwargs = dict(
in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue
)
self._in_stream = [j for j in jobs]
self._out_stream = []
self._total = len(self._in_stream)
def run(self):
# Spawn Worker
worker_processes = [
WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
]
self._spawned_procs = [
Process(target=process.run, args=tuple())
for process in worker_processes
]
for p in self._spawned_procs:
p.start()
self._serve()
monitor = threading.Thread(target=self._monitor, args=tuple())
monitor.start()
collector = threading.Thread(target=self._collect, args=tuple())
collector.start()
self._join_workers()
# TODO: Terminiate threads
monitor.join()
collector.join()
return self._out_stream
def _join_workers(self):
for p in self._spawned_procs:
p.join(TIMEOUT)
if p.is_alive():
p.terminate()
job = self._jobs_on_procs.get(p.name)
print('Process TIMEOUT: {0} {1}'.format(p.name, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
else:
if p.exitcode == 0:
print("{} exit with code:{}".format(p, p.exitcode))
else:
job = self._jobs_on_procs.get(p.name)
if p.exitcode > 0:
print("{} with code:{} {}".format(p, p.exitcode, job))
else:
print("{} been killed with code:{} {}".format(p, p.exitcode, job))
result = {
"status": "failed"
}
self._out_queue.put(result)
def _collect(self):
# TODO: Spawn a collector proc
while True:
r = self._out_queue.get()
self._out_stream.append(r)
if len(self._out_stream) >= self._total:
print("Total {} jobs done.".format(len(self._out_stream)))
break
def _serve(self):
for job in self._in_stream:
self._in_queue.put(job)
for _ in range(self._workers_num):
self._in_queue.put(None)
def _monitor(self):
running = 0
while True:
proc_name, job = self._run_queue.get()
running += 1
self._jobs_on_procs.update({proc_name: job})
if running == self._total:
break
class WorkerProcess(object):
def __init__(self, worker_id, in_queue, run_queue, out_queue):
self._worker_id = worker_id
self._in_queue = in_queue
self._run_queue = run_queue
self._out_queue = out_queue
def run(self):
self._work()
print('worker - {} quit'.format(self._worker_id))
def _work(self):
print("worker - {0} start to work".format(self._worker_id))
job = {}
while True:
job = self._in_queue.get()
if not job:
break
try:
proc = current_process()
self._run_queue.put((proc.name, job))
r = self._run_job(job)
self._out_queue.put(r)
except Exception as err:
print('Unhandle exception: {0}'.format(err), exc_info=True)
result = {"status": 'failed'}
self._out_queue.put(result)
def _run_job(self, job):
time.sleep(job)
return {
'status': 'succeed'
}
def main():
jobs = [3, 4, 5, 6, 7]
procs_num = 3
m = WorkersManager(jobs, procs_num)
m.run()
if __name__ == "__main__":
main()
打印:
worker - 0 start to work
worker - 1 start to work
worker - 2 start to work
Process TIMEOUT: Process-1 3
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.
你可能已经意识到了这一点,但尽职调查需要我提到有两个优秀的类multiprocessing.Pool
和concurrent.futures.ProcessPoolExecutor
可以用来做你想完成的事情。有关比较,请参见此。
进一步解释
使用支持调用task_done
的JoinableQueue
有什么意义通常,这样可以确保您放置在队列中的所有消息都已从队列中取出并得到处理,并且主进程不会在此之前提前终止。但是,这在代码中无法正常工作,因为您只给进程TIMEOUT
秒的时间来处理其消息,然后如果进程仍然存在,则终止进程,并且消息可能仍留在其队列中。这就是为什么你不得不人为地向task_done
发出额外的调用,这样你对主进程中队列中join
的调用就不会挂起,以及为什么你必须首先发布这个问题。
因此,有两种方法你可以采取不同的做法。一种方法是允许您继续使用JoinableQueue
实例,并在这些实例上调用join
以知道何时终止。但是(1)您将无法提前终止消息进程;(2)您的消息进程必须正确处理异常,以便在不清空队列的情况下不会提前终止。
另一种方式是我提出的,它要简单得多。主进程只是在输入队列上放置一个特殊的sentinel消息,在本例中为None
。这只是一条消息,不能被误认为是要处理的实际消息,而是表示文件结束,换句话说,这是向消息进程发出的一个信号,即队列中不再有消息,它现在可能会终止。因此,主过程只需要在";真实的";队列上要处理的消息,附加的哨兵消息,然后不是对消息队列(现在只是常规的、不可加入的队列)执行join
调用,而是对每个进程实例执行join(TIMEOUT)
调用,您会发现它不再活动,因为它已经看到了sentinel,并且因此您知道它已经处理了所有消息,或者如果您愿意在其输入队列中留下消息,您可以在进程上调用terminate
。
当然,要想真正确保自行终止的进程确实清空了队列,可能需要检查它们的队列,以确定它们确实是空的。但我认为,您应该能够对流程进行编码,以正确处理异常,至少是那些可以处理的异常,这样它们就不会过早终止并执行某些操作;"合理";每一条信息。