Python 多处理的可迭代实现中的继承。队列



我发现pythonmultiprocessing.Queue的默认实现缺乏,因为它不像任何其他集合那样可迭代。所以我开始努力创建一个它的"子类",添加功能。从下面的代码中可以看出,它不是一个合适的子类,因为multiprocess.Queue本身不是一个直接类,而是一个工厂函数,真正的底层类是multiprocess.queues.Queue。我没有理解也没有精力去模仿工厂函数,这样我就可以正确地从类继承,所以我只是让新类从工厂创建它自己的实例并将其视为超类。这是代码;

from multiprocessing import Queue, Value, Lock
import queue
class QueueClosed(Exception):
pass
class IterableQueue:
def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
def close(self):
with self.close_lock:
self.closed.value = True
self.queue.close()
def put(self, elem, block=True, timeout=None):
with self.close_lock:
if self.closed.value:
raise QueueClosed()
else:
self.queue.put(elem, block, timeout)
def put_nowait(self, elem):
self.put(elem, False)
def get(self, block=True):
if not block:
return self.queue.get_nowait()
elif self.closed.value:
try:
return self.queue.get_nowait()
except queue.Empty:
return None
else:
val = None
while not self.closed.value:
try:
val = self.queue.get_nowait()
break
except queue.Empty:
pass
return val
def get_nowait(self):
return self.queue.get_nowait()
def join_thread(self):
return self.queue.join_thread()
def __iter__(self):
return self
def __next__(self):
val = self.get()
if val == None:
raise StopIteration()
else:
return val
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

这允许我像普通multiprocessing.Queue一样实例化一个IterableQueue对象,像正常一样将元素放入其中,然后在子消费者内部,简单地像这样循环它;

from iterable_queue import IterableQueue
from multiprocessing import Process, cpu_count
import os
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
def consumer(queue):
print(f"[{os.getpid()}] Consuming")
for i in queue:
print(f"[{os.getpid()}] < {i}")
n = fib(i)
print(f"[{os.getpid()}] {i} > {n}")
print(f"[{os.getpid()}] Closing")
def producer():
print("Enqueueing")
with IterableQueue() as queue:
procs = [Process(target=consumer, args=(queue,)) for _ in range(cpu_count())]
[p.start() for p in procs]
[queue.put(i) for i in range(36)]
print("Finished")
if __name__ == "__main__":
producer()

它几乎可以无缝地工作;一旦队列关闭,消费者就会退出循环,但只有在耗尽所有剩余元素之后。但是,我对缺乏继承方法并不满意。为了模仿实际的继承行为,我尝试向类添加以下元函数调用;

def __getattr__(self, name):
if name in self.__dict__:
return self.__dict__[name]
else:
return self.queue.__getattr__[name]

但是,当在子multiprocessing.Process线程中操作IterableQueue类的实例时,此操作将失败,因为类的__dict__属性不会保留在其中。我试图通过用multiprocessing.Manager().dict()替换类的默认__dict__来以一种笨拙的方式解决这个问题,就像这样;

def __init__(self, maxsize=0):
self.closed = Value('b', False)
self.close_lock = Lock()
self.queue = Queue(maxsize)
self.__dict__ = Manager().dict(self.__dict__)

但是,在这样做时,我收到一个错误,指出RuntimeError: Synchronized objects should only be shared between processes through inheritance.所以我的问题是,我应该如何正确地从 Queue 类继承,以便子类继承了对其所有属性的访问权限?此外,当队列为空但未关闭时,消费者都坐在繁忙的循环中,而不是真正的 IO 块,占用了宝贵的 CPU 资源。如果您对我可能会在此代码中遇到的并发和竞争条件问题有任何建议,或者我如何解决繁忙的循环问题,我也愿意接受其中的建议。


基于MisterMiyagi提供的代码,我创建了这个通用IterableQueue类,它可以接受任意输入,正确阻塞,并且不会挂起队列关闭;

from multiprocessing.queues import Queue
from multiprocessing import get_context
class QueueClosed(Exception):
pass
class IterableQueue(Queue):
def __init__(self, maxsize=0, *, ctx=None):
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
super().put((None, False))
super().close()
def __iter__(self):
return self
def __next__(self):
try:
return self.get()
except QueueClosed:
raise StopIteration
def get(self, *args, **kwargs):
result, is_open = super().get(*args, **kwargs)
if not is_open:
super().put((None, False))
raise QueueClosed
return result
def put(self, val, *args, **kwargs):
super().put((val, True), *args, **kwargs)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

multiprocess.Queue包装器仅用于使用默认上下文。

def Queue(self, maxsize=0):
'''Returns a queue object'''
from .queues import Queue
return Queue(maxsize, ctx=self.get_context())

继承时,可以在__init__方法中复制此内容。这允许您继承整个Queue行为。您只需要添加迭代器方法:

from multiprocessing.queues import Queue
from multiprocessing import get_context

class IterableQueue(Queue):
"""
``multiprocessing.Queue`` that can be iterated to ``get`` values
:param sentinel: signal that no more items will be received
"""
def __init__(self, maxsize=0, *, ctx=None, sentinel=None):
self.sentinel = sentinel
super().__init__(
maxsize=maxsize,
ctx=ctx if ctx is not None else get_context()
)
def close(self):
self.put(self.sentinel)
# wait until buffer is flushed...
while self._buffer:
time.sleep(0.01)
# before shutting down the sender
super().close()
def __iter__(self):
return self
def __next__(self):
result = self.get()
if result == self.sentinel:
# re-queue sentinel for other listeners
self.put(result)
raise StopIteration
return result

请注意,指示队列结束的sentinel按相等进行比较,因为标识不会跨进程保留。经常使用的queue.Queue哨兵object()无法正常工作。

最新更新