我正在尝试使用多处理来生成复杂的、不可选择的对象,如下面的代码片段所示:
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool
class Facility:
def __init__(self):
self.blocks = Manager().list()
def __process_blocks(self, block):
designer = block["designer"]
apply_terrain = block["terrain"]
block_type = self.__block_type_to_string(block["type"])
block = designer.generate_block(block_id=block["id"],
block_type=block_type,
anchor=Point(float(block["anchor_x"]), float(block["anchor_y"]),
float(block["anchor_z"])),
pcu_anchor=Point(float(block["pcu_x"]), float(block["pcu_y"]), 0),
corridor_width=block["corridor"],
jb_height=block["jb_connect_height"],
min_boxes=block["min_boxes"],
apply_terrain=apply_terrain)
self.blocks.append(block)
def design(self, apply_terrain=False):
designer = FacilityBuilder(string_locator=self._string_locator, string_router=self._string_router,
box_router=self._box_router, sorter=self._sorter,
tracker_configurator=self._tracker_configurator, config=self._config)
blocks = [block.to_dict() for index, block in self._store.get_blocks().iterrows()]
for block in blocks:
block["designer"] = designer
block["terrain"] = apply_terrain
with ProcessingPool() as pool:
pool.map(self.__process_blocks, blocks)
(努力用更简单的代码重现这一点,所以我显示实际的代码)
我需要更新一个可共享的变量,所以我使用multiprocessing.Manager
初始化一个类级变量,如下所示:
self.blocks = Manager().list()
这给我留下了以下错误(只是部分stacktrace):
File "C:UsersPaul.NelDocumentsreposautoPV.autopvlibsite-packagesdill_dill.py", line 481, in load
obj = StockUnpickler.load(self)
File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingmanagers.py", line 933, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingmanagers.py", line 783, in __init__
self._incref()
File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingmanagers.py", line 837, in _incref
conn = self._Client(self._token.address, authkey=self._authkey)
File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingconnection.py", line 513, in Client
answer_challenge(c, authkey)
File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingconnection.py", line 764, in answer_challe
nge
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
作为最后的手段,我试图使用python
的标准ThreadPool
实现来试图规避pickle
问题,但这也不顺利。我读过很多类似的问题,但还没有找到解决这个问题的方法。是dill
的问题还是pathos
与mulitprocessing.Manager
的接口方式的问题?
编辑:所以我设法复制这个样例代码如下:
import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool
class MyComplex:
def __init__(self, x):
self._z = x * x
def me(self):
return math.sqrt(self._z)
class Starter:
def __init__(self):
manager = Manager()
self.my_list = manager.list()
def _f(self, value):
print(f"{value.me()} on {os.getpid()}")
self.my_list.append(value.me)
def start(self):
names = [MyComplex(x) for x in range(100)]
with ProcessingPool() as pool:
pool.map(self._f, names)
if __name__ == '__main__':
starter = Starter()
starter.start()
当我添加self.my_list = manager.list()
时出现错误。
所以我已经解决了这个问题。如果有人喜欢mmckerns或其他比我更了解多处理的人可以评论为什么这是一个解决方案,我仍然会很棒。
问题似乎是Manager().list()
在__init__
中声明了。下面的代码可以正常工作,没有任何问题:
import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool
class MyComplex:
def __init__(self, x):
self._z = x * x
def me(self):
return math.sqrt(self._z)
class Starter:
def _f(self, value):
print(f"{value.me()} on {os.getpid()}")
return value.me()
def start(self):
manager = Manager()
my_list = manager.list()
names = [MyComplex(x) for x in range(100)]
with ProcessingPool() as pool:
my_list.append(pool.map(self._f, names))
print(my_list)
if __name__ == '__main__':
starter = Starter()
starter.start()
这里我声明list
是ProcessingPool
操作的局部。如果我愿意,我可以在之后将结果赋值给类级别变量。