使用python 3.4.3,我有一个生成器函数foo
,它产生要并行处理的数据。将这个函数传递给n个进程的multiprocessing.Pool.map
,我期望它一次被调用n次:
from multiprocessing import Pool
import time
now = time.time
def foo(n):
for i in range(n):
print("%f get %d" % (now(), i))
yield i
def bar(i):
print("%f start %d" % (now(), i))
time.sleep(1)
print("%f end %d" % (now(), i))
pool = Pool(2)
pool.map(bar, foo(6))
pool.close()
pool.join()
不幸的是,生成器函数被立即调用了6次。输出如下:
1440713274.290760 get 0
1440713274.290827 get 1
1440713274.290839 get 2
1440713274.290849 get 3
1440713274.290858 get 4
1440713274.290867 get 5
1440713274.291526 start 0
1440713274.291654 start 1
1440713275.292680 end 0
1440713275.292803 end 1
1440713275.293056 start 2
1440713275.293129 start 3
1440713276.294106 end 2
1440713276.294182 end 3
1440713276.294344 start 4
1440713276.294390 start 5
1440713277.294803 end 4
1440713277.294859 end 5
但我希望得到更像:
1440714272.612041 get 0
1440714272.612078 get 1
1440714272.612090 start 0
1440714272.612100 start 1
1440714273.613174 end 0
1440714273.613247 end 1
1440714273.613264 get 2
1440714273.613276 get 3
1440714273.613287 start 2
1440714273.613298 start 3
1440714274.614357 end 2
1440714274.614423 end 3
1440714274.614432 get 4
1440714274.614437 get 5
1440714274.614443 start 4
1440714274.614448 start 5
1440714275.615475 end 4
1440714275.615549 end 5
(原因是foo要将大量数据读入内存)
我得到了相同的结果与pool.imap(bar, foo(6), 2)
和
for i in foo(6):
pool.apply_async(bar, args=(i,))
最简单的方法是什么?
我遇到过一个类似的问题,我需要读取大量数据并并行处理其中的部分数据。我通过子类化多处理解决了这个问题。处理和使用队列。我认为你会从embarrassingly parallel problems
的阅读中受益。我在下面给出了示例代码:
import multiprocessing
import time
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S')
#Producer class
class foo(multiprocessing.Process):
def __init__(self, n, queues):
super(foo, self).__init__()
self.n=n
self.queues = queues
def run(self):
logging.info('Starting foo producer')
for i in range(self.n):
logging.info('foo: Sending "%d" to a consumer' % (i))
self.queues[i%len(self.queues)].put(i)
time.sleep(1)#Unnecessary sleep to demonstrate order of events
for q in self.queues:
q.put('end')
logging.info('Ending foo producer')
return
#Consumer class
class bar(multiprocessing.Process):
def __init__(self, idx, queue):
super(bar, self).__init__()
self.idx = idx
self.queue = queue
def run(self):
logging.info("Starting bar %d consumer" % (self.idx ))
while True:
fooput = self.queue.get()
if type(fooput)==str and fooput=='end':
break
logging.info('bar %d: Got "%d" from foo' % (self.idx, fooput))
time.sleep(2)#Unnecessary sleep to demonstrate order of events
logging.info("Ending bar %d consumer" % (self.idx ))
return
if __name__=='__main__':
#make queues to put data read by foo
count_queues = 2
queues =[]
for i in range(count_queues):
q = multiprocessing.Queue(2)
# Give queue size according to your buffer requirements
queues.append(q)
#make reader for reading data. lets call this object Producer
foo_object = foo(6, queues)
#make receivers for the data. Lets call these Consumers
#Each consumer is assigned a queue
bar_objects = []
for idx, q in enumerate(queues):
bar_object = bar(idx, q)
bar_objects.append(bar_object)
# start the consumer processes
for bar_object in bar_objects:
bar_object.start()
# start the producer processes
foo_object.start()
#Join all started processes
for bar_object in bar_objects:
bar_object.join()
foo_object.join()
我自己能想出的最好的办法是:
pool_size = 2
pool = Pool(pool_size)
count = 0
for i in foo(6):
count += 1
if count % pool_size == 0:
pool.apply(bar, args=(i,))
else:
pool.apply_async(bar, args=(i,))
pool.close()
pool.join()
对于pool_size=2
,它输出:
1440798963.389791 get 0
1440798963.490108 get 1
1440798963.490683 start 0
1440798963.595587 start 1
1440798964.491828 end 0
1440798964.596687 end 1
1440798964.597137 get 2
1440798964.697373 get 3
1440798964.697629 start 2
1440798964.798024 start 3
1440798965.698719 end 2
1440798965.799108 end 3
1440798965.799419 get 4
1440798965.899689 get 5
1440798965.899984 start 4
1440798966.001016 start 5
1440798966.901050 end 4
1440798967.002097 end 5
对于pool_size=3
,它输出:
1440799101.917546 get 0
1440799102.018438 start 0
1440799102.017869 get 1
1440799102.118868 get 2
1440799102.119903 start 1
1440799102.219616 start 2
1440799103.019600 end 0
1440799103.121066 end 1
1440799103.220746 end 2
1440799103.221124 get 3
1440799103.321402 get 4
1440799103.321664 start 3
1440799103.422589 get 5
1440799103.422824 start 4
1440799103.523286 start 5
1440799104.322934 end 3
1440799104.423878 end 4
1440799104.524350 end 5
然而,一旦apply
完成,它将从迭代器中获取3个新项。如果处理所花费的时间是可变的,那么这将不能很好地工作。