我正在尝试使用multiprocessing.Pool
将一些作业异步调度到外部进程,例如:
#!/bin/env python3
'''
test.py
'''
import multiprocessing.util
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE
multiprocessing.util.log_to_stderr(multiprocessing.util.DEBUG)
def waiter(arg):
cmd = "sleep 360"
cmd_arg = shlex.split(cmd)
p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE)
so, se = p.communicate()
print (f"{so}n{se}")
return arg
def main1():
proc_pool = Pool(4)
it = proc_pool.imap_unordered(waiter, range(0, 4))
for r in it:
print (r)
if __name__ == '__main__':
main1()
我希望它在SIGINT
上终止所有调用的子进程、池工作者和自身。目前,这适用于池大小为4
的 :
$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140194873397248
[DEBUG/MainProcess] created semlock with handle 140194873393152
[DEBUG/MainProcess] created semlock with handle 140194873389056
[DEBUG/MainProcess] created semlock with handle 140194873384960
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-2] process shutting down
[DEBUG/ForkPoolWorker-2] running all "atexit" [DEBUG/ForkPoolWorker-3] runni[DEBUG/ForkPoolWorker-2] running the remaining[DEBUG/ForkPoolWorker-3] running the remaining "atexit" finalizers
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "./test.py", line 14, in waiter
so, se = p.communicate()
File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
stdout, stderr = self._communicate(input, en[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] result handler found thread._state=TERMINATE
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=1, thread._state=2
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
= self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-2] process exiting with exitcode 1
Process ForkPoolWorker-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "./test.py", line 14, in waiter
so, se = p.communicate()
File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
stdout, stderr = self._communicate(input, endtime, timeout)
File "/usr/local/lib/python3.6/subprocess.py", line 1496, in _communicate
ready = selector.select(timeout)
File "/usr/local/lib/python3.6/selectors.py", line 376, in select
fd_event_list = self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-3] process exiting with exitcode 1
$>
但随着池大小的增加,它开始间歇性地失败,例如:
$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140143972425728
[DEBUG/MainProcess] created semlock with handle 140143972421632
[DEBUG/MainProcess] created semlock with handle 140143972417536
[DEBUG/MainProcess] created semlock with handle 140143972413440
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-12] process shutting down
[DEBUG/ForkPoolWorker-12] running all "atexit"[DEBUG/ForkPoolWorker-8] runnin[DEBUG/ForkPoolWorker-12] running the remaini[DEBUG/ForkPoolWorker-8] running the remaining "atexit" finalizers
Process ForkPoolWorker-4:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "./test.py", line 14, in waiter
so, se = p.communicate()
File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
stdout, stderr = self._communicate(input, endProcess ForkPoolWorker-9:
Traceback (most recent call last)[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] task handler exiting
(... Hangs here until I hit Ctrl-C again ...)
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 534, in _terminate_pool
cls._help_stuff_finish(inqueue, task_handler, len(pool))
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 519, in _help_stuff_finish
inqueue._rlock.acquire()
KeyboardInterrupt
$>
大多数路径都通向这里,但是此解决方法所指的错误似乎已在3.3+中名义上得到修复,这就是为什么它甚至在某些时候都可以工作的原因; 无论哪种情况,我都尝试了该解决方案,但它没有解决问题。有没有人对修复或进一步调试此问题有建议?
环境:Python 3.6.1 在具有 32 个内核的 SuSe 盒子上。
所以实际上经过大量挖掘,我尝试了该帖子中建议的其他一些组合,这似乎解决了这个问题。据我所知,如果您允许SIGINT
传递到子进程,就像默认情况下一样,并且您的池工作线程比作业多得多,则池工作线程可能会在不释放锁的情况下被杀死._inqueue
。然后_help_stuff_finish
锁,试图获取工人从未释放的锁。
#!/bin/env python3.6
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE
import signal
def waiter(arg):
cmd = "sleep 360"
cmd_arg = shlex.split(cmd)
p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE)
so, se = p.communicate()
print (f"{so}n{se}")
return arg
########################
# Adapted from: https://stackoverflow.com/a/44869451
#
proc_pool = None
def int_handler (*arg, **kwargs):
if proc_pool:
proc_pool.terminate()
proc_pool.join()
exit(1)
def initializer():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc_pool = Pool(32, initializer=initializer)
def main1():
it = proc_pool.imap_unordered(waiter, range(0, 4))
for r in it:
print (r)
if __name__ == '__main__':
main1()