我正在使用python 3.7的多处理库和一个通过队列与子进程通信的父进程。我可以很好地使用multiprocessing.queue.Queue,但是当我子类向队列添加一些额外的功能时,它们在父级中设置得很好,但在孩子访问时似乎丢失了。知道我如何让属性在孩子身上持久存在吗?
import sys
import time
import multiprocessing
import multiprocessing.queues
# Subclass multiprocessing.queue.Queue to add some useful features
class q_class(multiprocessing.queues.Queue):
def __init__(self):
self.report_bottlenecks = False
self.bottleneck_time = 1.0 #in seconds
super(q_class, self).__init__(ctx=multiprocessing.get_context())
def put(self,header,payload=None):
message = {}
message['header'] = header
message['payload'] = payload
message['put_time'] = time.time()
super(q_class, self).put(message)
def get(self):
message = super(q_class, self).get()
message['get_time'] = time.time()
message['queue_time'] = message['get_time'] - message['put_time']
if self.report_bottlenecks:
if message['queue_time']>self.bottleneck_time:
self.debug.print('Queue bottleneck: '+str(int(message['queue_time']))+' seconds.')
return(message)
def function_for_child_to_run(the_q):
print('Child process started')
print(the_q.report_bottlenecks) #currently causes an AttributeError: 'q_class' object has no attribute 'report_bottlenecks'
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver')
my_q = q_class()
print(my_q.report_bottlenecks) #should print "False"
process = multiprocessing.Process( target=function_for_child_to_run , args=(my_q,) )
process.start()
time.sleep(2) #give the child time to start & print
process.join()
sys.exit()
为了与forkserver一起使用,你的队列必须被腌制。multiprocessing.queues.Queue
定义了用于酸洗和取消酸取实例属性的__getstate__
和__setstate__
方法。将属性添加到子类化队列时,必须使用自定义属性扩展这两个方法,否则它们将丢失。
将此添加到您的代码中:
from multiprocessing import context
...
class QClass(multiprocessing.queues.Queue):
...
def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.report_bottlenecks, self.bottleneck_time) # <---
def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.report_bottlenecks, self.bottleneck_time) = state # <---
self._after_fork()