如何在进程之间正确共享Manager dict



我想做的是在Process的子类之间共享一个字典,当一个进程更新字典时,会通知另一个进程使用它。下面的代码中说明了这一点,MyProducer开始填充字典,在每次迭代中都会触发一个事件,通知MyConsumer处理字典。除了MyConsumer中字典为空的部分外,其他部分都正常工作。。。

from multiprocessing import Process, Manager, Event
class MyProducer(Process):
increment = 0
def __init__(self, dictionary, event):
Process.__init__(self)
self.dictionary = dictionary
self.event = event

def run(self):
while self.increment < 20:
self.dictionary[self.increment]=self.increment+10
self.increment = self.increment + 1
print("From producer: ", self.dictionary)
self.event.set()
while self.event.is_set() is True:
increment = self.increment
increment = increment + 1

class MyConsumer(Process):
def __init__(self, dictionary, event):
Process.__init__(self)
self.dictionary = dictionary
self.event = event


def run(self):
while True:
self.event.wait()
print("From consumer: ", self.dictionary)
self.event.clear()


if __name__ == "__main__":
with Manager() as manager:
state_dict = manager.dict()
state_ready = Event()
producerprocess = MyProducer(state_dict, state_ready)
consumerprocess = MyConsumer(state_dict, state_ready)
producerprocess.start()
consumerprocess.start()    

输出为

Process MyProducer-2:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "main.py", line 13, in run
self.dictionary[self.increment]=self.increment+10
File "<string>", line 2, in __setitem__
File "/usr/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
self._connect()
File "/usr/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 630, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

更新

我的意图是理解为什么字典不适用于Process子类。我知道你在网上能找到的所有有效的案例。事实上,我有一个很好的解决方案,只需将dict替换为队列,我想了解dict为什么不起作用。

from multiprocessing import Process, Queue, Event
class MyProducer(Process):
increment = 0
def __init__(self, queue, event):
Process.__init__(self)
self.queue = queue
self.event = event

def run(self):
while self.increment < 20:
self.queue.put([self.increment,self.increment+10])
self.increment = self.increment + 1
print("From producer: ", self.queue.qsize())
self.event.set()
while self.event.is_set() is True:
increment = self.increment
increment = increment + 1

class MyConsumer(Process):
def __init__(self, queue, event):
Process.__init__(self)
self.queue = queue
self.event = event

def run(self):
while True:
self.event.wait()
print("From consumer: ", self.queue.qsize())
self.event.clear()

if __name__ == "__main__":
state_queue = Queue()
state_ready = Event()
producerprocess = MyProducer(state_queue, state_ready)
consumerprocess = MyConsumer(state_queue, state_ready)
producerprocess.start()
consumerprocess.start()  

FYI,我在这个更简单的程序中看到了类似的死亡:

from multiprocessing import Process, Manager, Event
class MyProducer(Process):
def __init__(self, value, event):
Process.__init__(self)
self.val = value
self.event = event
def run(self):
print("at producer start", self.val.value)
self.val.value = 42
self.event.set()
class MyConsumer(Process):
def __init__(self, value, event):
Process.__init__(self)
self.val = value
self.event = event
def run(self):
self.event.wait()
print("From consumer: ", self.val.value)

if __name__ == "__main__":
with Manager() as manager:
state_value = manager.Value('i', 666)
state_ready = Event()
producerprocess = MyProducer(state_value, state_ready)
consumerprocess = MyConsumer(state_value, state_ready)
producerprocess.start()
consumerprocess.start()

这意味着当从CCD_ 5获得的no类型的对象作为属性附加到对象mp时;通过魔术";在工人过程中。连接到Manager服务器进程所需的信息似乎丢失了(无论是Linux系统上的套接字还是Windows上的命名管道(。

您可以提交一份错误报告,但在此之前,除了重写代码以不使用Manager或将Manager对象显式传递给函数之外,无需采取任何措施。

错误报告可以有两种解决方案:(1(使其";工作;;或者,(2(当尝试创建这样的对象时,更改代码以引发异常。

另一种可能性(未尝试(:如果你只在Linux上运行,你可以跳过__name__ == "__main__"测试,并希望Manager连接信息能在fork()中幸存下来。

编辑

我在Python项目的跟踪器上打开了一个问题,在这里:

https://bugs.python.org/issue41660

变通方法

玩Python问题报告上的东西,";问题";这似乎不是如何设置的问题,而是在您的代码中忽略了干净地关闭工作人员的必要性。只需在代码末尾添加这一行(dict版本-您关心的版本(:

producerprocess.join()

足够了,所以现在在我的盒子上(Win 10 Python 3.8.5(,它可以产生您期望的输出。然而,它会永远挂起,因为你的消费者.wait()永远是一个永远不会设置的Event

我的猜测(我有80%的把握是正确的(:如果没有.join(),主进程将继续运行解释器关闭代码(没有什么可做的了!(,并开始强制销毁multiprocessing实现仍然需要正常运行的东西。

使用.join(),主进程会一直阻塞,直到生产者完成——在这段时间内不会启动任何关闭代码,.join()明确指示生产者进程干净地关闭(精心制作!(multiprocessing舞蹈的一部分。

它可能会使消费者流程处于损坏状态,但如果是这样,我们将永远看不到它的证据,因为消费者在其self.event.wait()上永远被阻止。

在一个真实的程序中,您也应该尽一切努力干净地关闭使用者进程。

完整代码

这里有一个完整的程序,展示了惯用的Python和并行编程的最佳实践:一切都会干净地关闭,没有;繁忙循环";,没有竞争,没有僵局。State的实现比这个特定问题所需的更为复杂,但它说明了一种非常值得学习的强大方法。

import multiprocessing as mp
P, C, F = 1, 2, 4 # bit flags for state values
# Unusual synchronization appears to be wanted here:
# After a producer makes a mutation, it must not make another
# before the consumer acts on it.  So we'll say we're in state
# P when the producer is allowed to mutate, and in state C
# when there's a mutation for the consumer to process.  Another
# state - F (for "finished") - tells the consumer it's time to
# quit. The producer stops on its own when it gets tired of
# mutating ;-)
class State:
def __init__(self):
# Initial state is empty - everyone is blocked.
# Note that we do our own locking around the shared
# memory, via the condition variable's mutex, so
# it would be pure waste for the Value to have
# its own lock too.
self.state = mp.Value('B', 0, lock=False)
self.changed = mp.Condition()
# Wait for state to change to one of the states in the
# flag mask `what`.  Return the bit flag of the state
# that succeeded.
def waitfor(self, what):
with self.changed:
while not (self.state.value & what):
self.changed.wait()
return self.state.value
# Force state to (bit flag) `what`, and notify waiters
# to wake up and see whether it's the state they're
# waiting for.
def setwhat(self, what):
with self.changed:
self.state.value = what
self.changed.notify_all()
class Base(mp.Process):
def __init__(self, dictionary, state):
super().__init__()
self.dictionary = dictionary
self.state = state
class MyProducer(Base):
def __init__(self, *args):
super().__init__(*args)
self.increment = 0
def run(self):
while self.increment < 20:
self.state.waitfor(P)
self.dictionary[self.increment] = self.increment + 10
self.state.setwhat(C)
# Whether the producer or the consumer prints the dict
# first isn't forced - and, indeed, they can both print at
# the same time, producing garbled output.  Move the
# print() above the setwhat(C) to force the producer
# to print first, if desired.
print("From producer: ", self.dictionary)
self.increment += 1
class MyConsumer(Base):
def run(self):
while self.state.waitfor(C | F) != F:
print("From consumer: ", self.dictionary)
self.state.setwhat(P)
def main():
with mp.Manager() as manager:
state_dict = manager.dict()
state_state = State()
producerprocess = MyProducer(state_dict, state_state)
consumerprocess = MyConsumer(state_dict, state_state)
producerprocess.start()
consumerprocess.start()
# The producer is blocked waiting for state P, and the
# consumer is blocked waiting for state C (or F). The
# loop here counts down 5 seconds, so you can verify
# by eyeball that the waits aren't "busy" (they consume
# essentially no CPU cycles).
import time
for i in reversed(range(5)):
time.sleep(1)
print(i)
state_state.setwhat(P) # tell the producer to start!
producerprocess.join() # and wait for it to finish
# wait for the consumer to finish eating the last mutation
state_state.waitfor(P)
# tell the consumer we're all done
state_state.setwhat(F)
consumerprocess.join()    
if __name__ == "__main__":
main()

最新更新