Python3 进程对象永远不会加入



让我首先说我没有使用队列,所以这个问题不是这个问题的副本,也没有使用进程池,所以它不是这个问题的副本。

我有一个 Process 对象,它使用线程工作线程池来完成某些任务。为了 MCVE,此任务只是构造一个从 0 到 9 的整数列表。这是我的来源:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process
from sys import stdout
class Quest():
def __init__(self):
pass
def doIt(self, i):
return i
class Test(Process):
def __init__(self, arg):
super(Test, self).__init__()
self.arg = arg
self.pool = Pool()
def run(self):
quest = Quest()
done = self.pool.map_async(quest.doIt, range(10), error_callback=print)
stdout.flush()
self.arg = [item for item in done.get()]
def __str__(self):
return str(self.arg)
# I tried both with and without this method
def join(self, timeout=None):
self.pool.close()
self.pool.join()
super(Test, self).join(timeout)

test = Test("test")
print(test) # should print 'test' (and does)
test.start()
# this line hangs forever
_ = test.join()
print(test) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

这是我希望我的实际程序做的一个非常粗略的模型。正如评论中指出的那样,问题在于Test.join总是永远挂起。这完全与该方法是否在 Test 类中被重写无关。它也从不打印任何内容,但是当我发送KeyboardInterrupt信号时,输出表明问题在于从工人那里获得结果:

test
^CTraceback (most recent call last):
File "./test.py", line 44, in <module>
Process Test-1:
_ = test.join()
File "./test.py", line 34, in join
super(Test, self).join(timeout)
File "/path/to/multiprocessing/process.py", line 124, in join
res = self._popen.wait(timeout)
File "/path/to/multiprocessing/popen_fork.py", line 51, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/path/to/multiprocessing/popen_fork.py", line 29, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
File "/path/to/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "./test.py", line 25, in run
self.arg = [item for item in done.get()]
File "/path/to/multiprocessing/pool.py", line 638, in get
self.wait(timeout)
File "/path/to/multiprocessing/pool.py", line 635, in wait
self._event.wait(timeout)
File "/path/to/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/path/to/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt

为什么愚蠢的过程不愚蠢退出?worker唯一要做的就是执行一个操作的单个取消引用和函数调用,它应该非常简单。

我忘了提:如果我Testthreading.Thread的子类而不是multiprocessing.Process个子类,这工作得很好。我真的不确定为什么这会把它分成两半。

  1. 您的目标是异步完成这项工作。为什么不从主进程生成异步子进程工作线程而不生成子进程(类 Test)?结果将在您的主流程中提供,无需做任何花哨的事情。如果您选择这样做,您可以在此处停止阅读。否则,请继续阅读。

  2. 您的联接将永远运行,因为有两个单独的池,一个是在创建进程对象(主进程的本地)时,另一个是在通过调用 process.start()(生成的进程的本地)来分叉进程时

例如,这不起作用:

def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
self.pool = Pool()
def run(self):
iterable = list(range(10))
self.shared.extend(self.pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared))
self.pool.close()

但是,这有效:

def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
def run(self):
pool = Pool()
iterable = list(range(10))
self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared))
pool.close()

这与以下事实有关:当您生成一个进程时,进程的整个代码、堆栈和堆段都会克隆到进程中,以便您的主进程和子进程具有单独的上下文。

因此,您正在主进程本地创建的池对象上调用 join(),并在池上调用 close()。然后,在 run() 中,当调用 start() 时,另一个池对象被克隆到子进程中,并且该池从未关闭,无法以您正在执行的方式连接。简而言之,主进程没有对子进程中克隆的池对象的引用。

如果我将 Test 作为线程的子类,这工作正常。线程代替 的多处理。过程。我真的不确定为什么这会破坏它 半。

这是有道理的,因为线程与进程的不同之处在于它们具有独立的调用堆栈,但共享其他内存段,因此您对在另一个线程中创建的对象所做的任何更新在主进程(这些线程的父进程)中都是可见的,反之亦然。

解决方法是创建 run() 函数本地的池对象。关闭子流程上下文中的池对象,并在主流程中加入子流程。这把我们带到了#2...

  1. 共享状态:有这些多处理。Manager() 对象,允许进程之间实现某种神奇的进程安全共享状态。管理器似乎不允许重新分配对象引用,这是有道理的,因为如果您在子流程中重新分配托管值,当子流程终止时,该进程上下文(代码、堆栈、堆)将消失,并且您的主进程永远不会看到此分配(因为它是引用子流程上下文的本地对象完成的)。不过,它可能适用于 ctype 基元值。

如果对 Manager() 更有经验的人想插话,那就太酷了。但是,以下代码为您提供了预期的行为:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process, Manager
from sys import stdout
class Quest():
def __init__(self):
pass
def doIt(self, i):
return i
class Test(Process):
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
def run(self):
with Pool() as pool:
iterable = list(range(10))
self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared)) # can remove, just to make sure we've updated state
def __str__(self):
return str(self.arg)
with Manager() as manager:
res = manager.list()
test = Test("test", res)
print(test) # should print 'test' (and does)
test.start()
test.join()
print("2" + str(res)) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

输出:

rpg711$ python multiprocess_async_join.py 
test
1[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

相关内容

  • 没有找到相关文章

最新更新