我正在制作测试安全带。这些测试将在大型数据集(在数百千兆字节中(进行操作,并且它们将在单独的过程中运行。我宁愿避免在磁盘上复制数据以使每个过程都使用,因此我在安全带中阅读一次,并将其推入测试。
这是计划,但是当测试并非全部以相同格式的相同数据运行时,它们最终会接收彼此的数据。我以为我无意中使用了同样的队列,但看起来并不像它。
在Redhat 6.7和Windows 7上的Python 3.6上测试。
from multiprocessing import Manager
from threading import Thread
from concurrent.futures import ProcessPoolExecutor
class QueueSplitter(object):
def __init__(self, queues=[]):
self.queues = queues
def append(self, q):
self.queues.append(q)
def put(self, obj):
for q in self.queues:
q.put(obj)
def close(self):
self.queues = []
self.done = True
class IterQueueSplitter(QueueSplitter):
def __init__(self, it, sentinel=None, queues=[]):
self.it = it
self.sentinel = sentinel
self.queues = queues
def send(self):
try:
self.put(next(self.it))
except StopIteration:
self.put(self.sentinel)
self.close()
def serve(server):
while not hasattr(server, 'done'):
server.send()
def consume(me, q):
for v in iter(q.get, None):
print('consumer %d: %d' % (me, v))
def repeatabunch(n):
for i in range(100):
yield n
if __name__ == '__main__':
with Manager() as man, ProcessPoolExecutor(4) as ex:
consumers = []
producers = []
servers = []
for i in range(8):
queue = man.Queue()
consumer = ex.submit(consume, i, queue)
consumers.append(consumer)
server = IterQueueSplitter(repeatabunch(i))
server.append(queue)
servers.append(server)
producers.append(Thread(target=serve, args=[server]))
for t in producers:
t.start()
for t in producers:
t.join()
for consumer in consumers:
consumer.result()
我如何获得对象保持队列?
结果,我忘记了默认参数的工作方式。queues=[]
并非每次调用该方法时都会创建一个新列表,它每次都在声明和重复使用时创建列表,这意味着一切都将进入每个队列。
正确的类定义看起来像这样:
class QueueSplitter(object):
def __init__(self, queues=None):
self.queues = queues or []
def append(self, q):
self.queues.append(q)
def put(self, obj):
for q in self.queues:
q.put(obj)
def close(self):
self.queues = []
self.done = True
class IterQueueSplitter(QueueSplitter):
def __init__(self, it, sentinel=None, queues=None):
self.it = it
self.sentinel = sentinel
self.queues = queues or []
def send(self):
try:
self.put(next(self.it))
except StopIteration:
self.put(self.sentinel)
self.close()