我已经与这个问题斗争了一段时间,我终于设法缩小了这个问题的范围,并创建了一个最低限度的工作示例。
问题的总结是,我有一个从dict
继承的类,以便于解析misc。输入文件。我已经重写了__setitem__
调用,以支持对输入文件中的节进行递归索引(例如,parser['some.section.variable']
等效于parser['some']['section']['variable']
(。一年多来,这对我们来说一直很有效,但我们只是在通过multiprocessing.apply_async
调用传递这些Parser
类时遇到了一个问题。
下面显示的是一个最低限度的工作示例-很明显,__setitem__
调用没有做任何特殊的事情,但它访问一些类属性(如self.section_delimiter
(是很重要的-这就是它中断的地方。它不会在初始调用或串行函数调用中中断。但是,当您使用apply_async
调用some_function
(它也不做任何事情(时,它会崩溃。
import multiprocessing as mp
import numpy as np
class Parser(dict):
def __init__(self, file_name : str = None):
print('t__init__')
super().__init__()
self.section_delimiter = "."
def __setitem__(self, key, value):
print('t__setitem__')
self.section_delimiter
dict.__setitem__(self, key, value)
def some_function(parser):
pass
if __name__ == "__main__":
print("Initialize creation/setting")
parser = Parser()
parser['x'] = 1
print("Single serial call works fine")
some_function(parser)
print("Parallel async call breaks on line 16?")
pool = mp.Pool(1)
for i in range(1):
pool.apply_async(some_function, (parser,))
pool.close()
pool.join()
如果你运行下面的代码,你会得到以下输出
Initialize creation/setting
__init__
__setitem__
Single serial call works fine
Parallel async call breaks on line 16?
__setitem__
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/queues.py", line 354, in get
return _ForkingPickler.loads(res)
File "test_apply_async.py", line 13, in __setitem__
self.section_delimiter
AttributeError: 'Parser' object has no attribute 'section_delimiter'
非常感谢您的帮助。我花了相当多的时间来追踪这个bug,并复制了一个最小的例子。我不仅想修复它,而且想清楚地填补我对这些apply_async
和继承/重写方法如何交互的理解中的一些空白。
如果你需要更多信息,请告诉我。
非常感谢!
Isaac
原因
问题的原因是multiprocessing
序列化和反序列化Parser
对象,以跨进程边界移动其数据。这是用泡菜做的。默认情况下,pickle在反序列化类时不调用__init__()
。因此,当反序列化程序调用__setitem__()
来恢复字典中的项时,self.section_delimiter
没有设置,并且您会得到错误:
只使用pickle而不使用多处理会产生相同的错误:
import pickle
parser = Parser()
parser['x'] = 1
data = pickle.dumps(parser)
copy = pickle.loads(data) # Same AttributeError here
反序列化将适用于没有项目的对象,并且section_delimiter
的值将被恢复:
import pickle
parser = Parser()
parser.section_delimiter = "|"
data = pickle.dumps(parser)
copy = pickle.loads(data)
print(copy.section_delimiter) # Prints "|"
因此,从某种意义上说,pickle在恢复Parser
的其余状态之前调用了__setitem__()
,这只是运气不好。
变通办法
您可以通过在__new__()
中设置section_delimiter
并通过实现__getnewargs__()
:告诉pickle要传递给__new__()
的参数来解决此问题
def __new__(cls, *args):
self = super(Parser, cls).__new__(cls)
self.section_delimiter = args[0] if args else "."
return self
def __getnewargs__(self):
return (self.section_delimiter,)
__getnewargs__()
返回一个参数元组。因为section_delimiter
是在__new__()
中设置的,所以不再需要在__init__()
中设置它。
这是更改后您的Parser
类的代码:
class Parser(dict):
def __init__(self, file_name : str = None):
print('t__init__')
super().__init__()
def __new__(cls, *args):
self = super(Parser, cls).__new__(cls)
self.section_delimiter = args[0] if args else "."
return self
def __getnewargs__(self):
return (self.section_delimiter,)
def __setitem__(self, key, value):
print('t__setitem__')
self.section_delimiter
dict.__setitem__(self, key, value)
更简单的解决方案
pickle在Parser
对象上调用__setitem__()
的原因是因为它是字典。如果您的Parser
只是一个恰好实现__setitem__()
和__getitem__()
的类,并且有字典来实现这些调用,则pickle将不会调用__setitem__()
,并且序列化将在没有额外代码的情况下工作:
class Parser:
def __init__(self, file_name : str = None):
print('t__init__')
self.dict = { }
self.section_delimiter = "."
def __setitem__(self, key, value):
print('t__setitem__')
self.section_delimiter
self.dict[key] = value
def __getitem__(self, key):
return self.dict[key]
因此,如果您的Parser
是一本字典没有其他原因,我就不会在这里使用继承。