使用带有进程池的队列(即(似乎是一种常见的模式
Pool(2).map(f, xs)
但是其中f的主体可以附加到被映射的项目,例如
from multiprocessing import Pool
xs = [0]
def f(n):
global xs
if n < 10:
xs.append(n + 1)
return n
Pool(2).map(f, xs)
期望返回[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
我意识到使用mt提供的原语"手动"构建这个模式是可能的,但这似乎是一个足够常见的模式,必须有一个通用的解决方案。你知道一个吗?
根据@martineau的建议,您的代码可以更新为:
import multiprocessing as mp
def f(n, xs, xn):
if n < 10:
xn.append(n)
xs.append(n + 1)
xn.append(n)
xs.append(n + 2)
if __name__ == '__main__':
with mp.Manager() as manager:
xs = manager.list()
xn = manager.list()
with mp.Pool(processes=2) as pool:
pool.starmap(f, [(n, xs, xn) for n in range(20)])
print(xn)
print(xs)
这将打印
[3, 0, 3, 0, 4, 1, 4, 1, 5, 2, 5, 2, 6, 6, 7, 9, 7, 9, 8, 8]
[4, 1, 5, 2, 5, 2, 6, 3, 6, 3, 7, 4, 7, 8, 8, 10, 9, 11, 9, 10]
对于这种情况,您无法保证n个值的生成顺序得到保留。
编辑:
import multiprocessing as mp
def f(n):
thresh = 10
if max(xs) <= thresh and n < thresh:
xs.append(n + 1)
if __name__ == '__main__':
with mp.Manager() as manager:
xs = manager.list([0])
with mp.Pool(processes=2) as pool:
pool.map(f, range(20))
print(sorted(xs))
这个打印
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
您可以创建一个类,该类可以从以下原语中执行此操作:
from multiprocessing import JoinableQueue, Process
class PoolQueue(object):
def __init__(self, n):
self.num_procs = n
def map(self, f, args):
payloads = JoinableQueue()
procs = []
def add_task(arg):
payloads.put(arg)
def process_task():
while True:
pl = payloads.get()
f(pl, add_task)
payloads.task_done()
for arg in args:
add_task(arg)
procs = [Process(target=process_task) for _ in range(self.num_procs)]
for p in procs:
p.start()
payloads.join()
for p in procs:
p.kill()
要测试它,请运行--
from time import sleep
from random import random
def pause():
sleep(random() / 100)
def process(payload, add_task):
print(payload)
pause()
if payload:
add_task(payload[:-1])
return payload
if __name__ == '__main__':
for x in range(1):
PoolQueue(2).map(
process,
[
'abcdefghij',
'0123456789',
'!@#$%^&*()',
],
)
这里的一个问题是,它死锁了队列增长超过32767个任务。gevent.queue.JoinableQueue
可以更好地处理这个问题,但这超出了这个问题的范围。