可靠地获取分解队列以在我想要python的地方传递对象



我正在制作测试安全带。这些测试将在大型数据集(在数百千兆字节中(进行操作,并且它们将在单独的过程中运行。我宁愿避免在磁盘上复制数据以使每个过程都使用,因此我在安全带中阅读一次,并将其推入测试。

这是计划,但是当测试并非全部以相同格式的相同数据运行时,它们最终会接收彼此的数据。我以为我无意中使用了同样的队列,但看起来并不像它。

在Redhat 6.7和Windows 7上的Python 3.6上测试。

from multiprocessing import Manager
from threading import Thread
from concurrent.futures import ProcessPoolExecutor

class QueueSplitter(object):
    def __init__(self, queues=[]):
        self.queues = queues
    def append(self, q):
        self.queues.append(q)
    def put(self, obj):
        for q in self.queues:
            q.put(obj)
    def close(self):
        self.queues = []
        self.done = True

class IterQueueSplitter(QueueSplitter):
    def __init__(self, it, sentinel=None, queues=[]):
        self.it = it
        self.sentinel = sentinel
        self.queues = queues
    def send(self):
        try:
            self.put(next(self.it))
        except StopIteration:
            self.put(self.sentinel)
            self.close()

def serve(server):
    while not hasattr(server, 'done'):
        server.send()

def consume(me, q):
    for v in iter(q.get, None):
        print('consumer %d: %d' % (me, v))

def repeatabunch(n):
    for i in range(100):
        yield n

if __name__ == '__main__':
    with Manager() as man, ProcessPoolExecutor(4) as ex:
        consumers = []
        producers = []
        servers = []
        for i in range(8):
            queue = man.Queue()
            consumer = ex.submit(consume, i, queue)
            consumers.append(consumer)
            server = IterQueueSplitter(repeatabunch(i))
            server.append(queue)
            servers.append(server)
            producers.append(Thread(target=serve, args=[server]))
        for t in producers:
            t.start()
        for t in producers:
            t.join()
        for consumer in consumers:
            consumer.result()

我如何获得对象保持队列?

结果,我忘记了默认参数的工作方式。queues=[]并非每次调用该方法时都会创建一个新列表,它每次都在声明和重复使用时创建列表,这意味着一切都将进入每个队列。

正确的类定义看起来像这样:

class QueueSplitter(object):
    def __init__(self, queues=None):
        self.queues = queues or []
    def append(self, q):
        self.queues.append(q)
    def put(self, obj):
        for q in self.queues:
            q.put(obj)
    def close(self):
        self.queues = []
        self.done = True

class IterQueueSplitter(QueueSplitter):
    def __init__(self, it, sentinel=None, queues=None):
        self.it = it
        self.sentinel = sentinel
        self.queues = queues or []
    def send(self):
        try:
            self.put(next(self.it))
        except StopIteration:
            self.put(self.sentinel)
            self.close()

相关内容

  • 没有找到相关文章

最新更新