重写的__setitem__调用以串行方式工作,但在apply_async调用中中断



我已经与这个问题斗争了一段时间,我终于设法缩小了这个问题的范围,并创建了一个最低限度的工作示例。

问题的总结是,我有一个从dict继承的类,以便于解析misc。输入文件。我已经重写了__setitem__调用,以支持对输入文件中的节进行递归索引(例如,parser['some.section.variable']等效于parser['some']['section']['variable'](。一年多来,这对我们来说一直很有效,但我们只是在通过multiprocessing.apply_async调用传递这些Parser类时遇到了一个问题。

下面显示的是一个最低限度的工作示例-很明显,__setitem__调用没有做任何特殊的事情,但它访问一些类属性(如self.section_delimiter(是很重要的-这就是它中断的地方。它不会在初始调用或串行函数调用中中断。但是,当您使用apply_async调用some_function(它也不做任何事情(时,它会崩溃。

import multiprocessing as mp
import numpy as np
class Parser(dict):
def __init__(self, file_name : str = None):
print('t__init__')
super().__init__()
self.section_delimiter = "."

def __setitem__(self, key, value):
print('t__setitem__')
self.section_delimiter
dict.__setitem__(self, key, value)

def some_function(parser):
pass
if __name__ == "__main__":
print("Initialize creation/setting")
parser = Parser()
parser['x'] = 1
print("Single serial call works fine")
some_function(parser)
print("Parallel async call breaks on line 16?")
pool = mp.Pool(1)
for i in range(1):
pool.apply_async(some_function, (parser,))
pool.close()
pool.join()

如果你运行下面的代码,你会得到以下输出

Initialize creation/setting
__init__
__setitem__
Single serial call works fine
Parallel async call breaks on line 16?
__setitem__
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/home/ijw/miniconda3/lib/python3.7/multiprocessing/queues.py", line 354, in get
return _ForkingPickler.loads(res)
File "test_apply_async.py", line 13, in __setitem__
self.section_delimiter
AttributeError: 'Parser' object has no attribute 'section_delimiter'

非常感谢您的帮助。我花了相当多的时间来追踪这个bug,并复制了一个最小的例子。我不仅想修复它,而且想清楚地填补我对这些apply_async和继承/重写方法如何交互的理解中的一些空白。

如果你需要更多信息,请告诉我。

非常感谢!

Isaac

原因

问题的原因是multiprocessing序列化和反序列化Parser对象,以跨进程边界移动其数据。这是用泡菜做的。默认情况下,pickle在反序列化类时不调用__init__()。因此,当反序列化程序调用__setitem__()来恢复字典中的项时,self.section_delimiter没有设置,并且您会得到错误:

属性错误:"Parser"对象没有属性"section_delimiter"

只使用pickle而不使用多处理会产生相同的错误:

import pickle
parser = Parser()
parser['x'] = 1
data = pickle.dumps(parser)
copy = pickle.loads(data) # Same AttributeError here

反序列化将适用于没有项目的对象,并且section_delimiter的值将被恢复:

import pickle
parser = Parser()
parser.section_delimiter = "|"
data = pickle.dumps(parser)
copy = pickle.loads(data)
print(copy.section_delimiter) # Prints "|"

因此,从某种意义上说,pickle在恢复Parser的其余状态之前调用了__setitem__(),这只是运气不好。

变通办法

您可以通过在__new__()中设置section_delimiter并通过实现__getnewargs__():告诉pickle要传递给__new__()的参数来解决此问题

def __new__(cls, *args):
self = super(Parser, cls).__new__(cls)
self.section_delimiter = args[0] if args else "."
return self
def __getnewargs__(self):
return (self.section_delimiter,)

__getnewargs__()返回一个参数元组。因为section_delimiter是在__new__()中设置的,所以不再需要在__init__()中设置它。

这是更改后您的Parser类的代码:

class Parser(dict):
def __init__(self, file_name : str = None):
print('t__init__')
super().__init__()
def __new__(cls, *args):
self = super(Parser, cls).__new__(cls)
self.section_delimiter = args[0] if args else "."
return self
def __getnewargs__(self):
return (self.section_delimiter,)

def __setitem__(self, key, value):
print('t__setitem__')
self.section_delimiter
dict.__setitem__(self, key, value)

更简单的解决方案

pickle在Parser对象上调用__setitem__()的原因是因为它字典。如果您的Parser只是一个恰好实现__setitem__()__getitem__()的类,并且字典来实现这些调用,则pickle将不会调用__setitem__(),并且序列化将在没有额外代码的情况下工作:

class Parser:
def __init__(self, file_name : str = None):
print('t__init__')
self.dict = { }
self.section_delimiter = "."
def __setitem__(self, key, value):
print('t__setitem__')
self.section_delimiter
self.dict[key] = value
def __getitem__(self, key):
return self.dict[key]

因此,如果您的Parser是一本字典没有其他原因,我就不会在这里使用继承。

相关内容

最新更新