Python多处理一次性读取输入迭代器



使用python 3.4.3,我有一个生成器函数foo,它产生要并行处理的数据。将这个函数传递给n个进程的multiprocessing.Pool.map,我期望它一次被调用n次:

from multiprocessing import Pool
import time
now = time.time
def foo(n):
    for i in range(n):
        print("%f get %d" % (now(), i))
        yield i
def bar(i):
    print("%f start %d" % (now(), i))
    time.sleep(1)
    print("%f end %d" % (now(), i))
pool = Pool(2)
pool.map(bar, foo(6))
pool.close()
pool.join()

不幸的是,生成器函数被立即调用了6次。输出如下:

1440713274.290760 get 0
1440713274.290827 get 1
1440713274.290839 get 2
1440713274.290849 get 3
1440713274.290858 get 4
1440713274.290867 get 5
1440713274.291526 start 0
1440713274.291654 start 1
1440713275.292680 end 0
1440713275.292803 end 1
1440713275.293056 start 2
1440713275.293129 start 3
1440713276.294106 end 2
1440713276.294182 end 3
1440713276.294344 start 4
1440713276.294390 start 5
1440713277.294803 end 4
1440713277.294859 end 5

但我希望得到更像:

1440714272.612041 get 0
1440714272.612078 get 1
1440714272.612090 start 0
1440714272.612100 start 1
1440714273.613174 end 0
1440714273.613247 end 1
1440714273.613264 get 2
1440714273.613276 get 3
1440714273.613287 start 2
1440714273.613298 start 3
1440714274.614357 end 2
1440714274.614423 end 3
1440714274.614432 get 4
1440714274.614437 get 5
1440714274.614443 start 4
1440714274.614448 start 5
1440714275.615475 end 4
1440714275.615549 end 5

(原因是foo要将大量数据读入内存)

我得到了相同的结果与pool.imap(bar, foo(6), 2)

for i in foo(6):
  pool.apply_async(bar, args=(i,))

最简单的方法是什么?

我遇到过一个类似的问题,我需要读取大量数据并并行处理其中的部分数据。我通过子类化多处理解决了这个问题。处理和使用队列。我认为你会从embarrassingly parallel problems的阅读中受益。我在下面给出了示例代码:

import multiprocessing
import time
import logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s  %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M:%S')
#Producer class
class foo(multiprocessing.Process):
    def __init__(self, n, queues):
        super(foo, self).__init__()
        self.n=n
        self.queues = queues
    def run(self):
        logging.info('Starting foo producer')
        for i in range(self.n):
            logging.info('foo: Sending "%d" to a consumer' % (i))
            self.queues[i%len(self.queues)].put(i)
            time.sleep(1)#Unnecessary sleep to demonstrate order of events
        for q in self.queues:
            q.put('end')
        logging.info('Ending foo producer')
        return
#Consumer class
class bar(multiprocessing.Process):
    def __init__(self, idx, queue):
        super(bar, self).__init__()
        self.idx = idx
        self.queue = queue
    def run(self):
        logging.info("Starting bar %d consumer" % (self.idx ))
        while True:
            fooput = self.queue.get()
            if type(fooput)==str and fooput=='end':
                break
            logging.info('bar %d: Got "%d" from foo' % (self.idx, fooput))
            time.sleep(2)#Unnecessary sleep to demonstrate order of events
        logging.info("Ending bar %d consumer" % (self.idx ))
        return

if __name__=='__main__':
    #make queues to put data read by foo
    count_queues = 2
    queues =[]
    for i in range(count_queues):
        q = multiprocessing.Queue(2)
        # Give queue size according to your buffer requirements
        queues.append(q)
    #make reader for reading data. lets call this object Producer
    foo_object = foo(6, queues)
    #make receivers for the data. Lets call these Consumers
    #Each consumer is assigned a queue
    bar_objects = []
    for idx, q in enumerate(queues):
        bar_object = bar(idx, q)
        bar_objects.append(bar_object)
    # start the consumer processes
    for bar_object in bar_objects:
        bar_object.start()

    # start the producer processes
    foo_object.start()

    #Join all started processes
    for bar_object in bar_objects:
        bar_object.join()
    foo_object.join()

我自己能想出的最好的办法是:

pool_size = 2
pool = Pool(pool_size)
count = 0
for i in foo(6):
    count += 1
    if count % pool_size == 0:
        pool.apply(bar, args=(i,))
    else:
        pool.apply_async(bar, args=(i,))
pool.close()
pool.join()

对于pool_size=2,它输出:

1440798963.389791 get 0
1440798963.490108 get 1
1440798963.490683 start 0
1440798963.595587 start 1
1440798964.491828 end 0
1440798964.596687 end 1
1440798964.597137 get 2
1440798964.697373 get 3
1440798964.697629 start 2
1440798964.798024 start 3
1440798965.698719 end 2
1440798965.799108 end 3
1440798965.799419 get 4
1440798965.899689 get 5
1440798965.899984 start 4
1440798966.001016 start 5
1440798966.901050 end 4
1440798967.002097 end 5

对于pool_size=3,它输出:

1440799101.917546 get 0
1440799102.018438 start 0
1440799102.017869 get 1
1440799102.118868 get 2
1440799102.119903 start 1
1440799102.219616 start 2
1440799103.019600 end 0
1440799103.121066 end 1
1440799103.220746 end 2
1440799103.221124 get 3
1440799103.321402 get 4
1440799103.321664 start 3
1440799103.422589 get 5
1440799103.422824 start 4
1440799103.523286 start 5
1440799104.322934 end 3
1440799104.423878 end 4
1440799104.524350 end 5

然而,一旦apply完成,它将从迭代器中获取3个新项。如果处理所花费的时间是可变的,那么这将不能很好地工作。

最新更新