Python多处理队列子类在进程中丢失属性



我正试图从python中的多处理队列中实现一个子类。子类包含一个简单的布尔标志";准备好";。当我将队列发送到一个新进程时,就绪属性将消失。以下代码演示了该问题:

import multiprocessing
import multiprocessing.queues

class ReadyQueue(multiprocessing.queues.Queue):
def __init__(self, ctx, *args, **kwargs):
super(ReadyQueue, self).__init__(ctx=ctx, *args, **kwargs)
self.ready = False

def ready_queue(*args, **kwargs):
return ReadyQueue(ctx=multiprocessing.get_context(), *args, **kwargs)

def foo(q):
print(q.ready)

if __name__ == "__main__":
my_queue = ready_queue()
print(my_queue.ready)
p = multiprocessing.Process(target=foo, args=(my_queue,))
p.start()
p.join()

输出:

False
Process Process-1:
Traceback (most recent call last):
File "C:Usersacre018Anaconda3envsEIT_Qtlibmultiprocessingprocess.py", line 315, in _bootstrap
self.run()
File "C:Usersacre018Anaconda3envsEIT_Qtlibmultiprocessingprocess.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:Usersacre018githubEIT_QtExperimentsready_queue_test.py", line 16, in foo
print(q.ready)
AttributeError: 'ReadyQueue' object has no attribute 'ready'

我实现了这个解决方法:

import multiprocessing
from queue import Empty
import time
import ctypes

class ReadyQueue:
def __init__(self, *args, **kwargs):
self.queue = multiprocessing.Queue(*args, **kwargs)
self._ready = multiprocessing.Value(ctypes.c_bool, False)
def set_ready(self):
self._ready.value = True
def set_not_ready(self):
self._ready.value = False
self.clear()
def is_ready(self):
return self._ready.value
def clear(self):
try:
while True:
self.queue.get(block=False)
except Empty:
pass
def get(self, block=True, timeout=None):
return self.queue.get(block, timeout)
def put(self, obj, block=True, timeout=None):
return self.queue.put(obj, block, timeout)
def full(self):
return self.queue.full()
def empty(self):
return self.queue.empty()
def qsize(self):
return self.queue.qsize()

def foo(q):
while q.is_ready():
time.sleep(1)
q.put("hello from foo")
print("q no longer ready, foo loop finished")

if __name__ == "__main__":
my_queue = ReadyQueue()
my_queue.set_ready()
p = multiprocessing.Process(target=foo, args=(my_queue,))
p.start()
for i in range(2):
print(my_queue.get())
time.sleep(2)
print("my_queue._ready = %s, qsize: %d. Setting not ready.." % (str(my_queue.is_ready()), my_queue.qsize()))
my_queue.set_not_ready()
print("my_queue._ready = %s, qusize: %d" % (str(my_queue.is_ready()), my_queue.qsize()))

输出:

C:Usersacre018Anaconda3envstest_pyqtpython.exe C:/Users/acre018/github/EIT_Qt/Experiments/ready_queue_test2.py
hello from foo
hello from foo
my_queue._ready = True, qsize: 2. Setting not ready..
my_queue._ready = False, qusize: 0
q no longer ready, foo loop finished
Process finished with exit code 0

解决方法是使我的ReadyQueue类不继承自multiprocessing.queues.Queue,而是将队列作为属性。为了方便起见,我从队列中实现了所需的方法,它们只是传递到队列属性。我还实现了一个clear方法。

请注意,在我的第一个示例中,我忽略了将self.ready设为multiprocessing.Value,因此无法跨流程编辑它,但我在修复后进行了测试,它不是问题的根源。

最新更新