使用多处理映射的 Python 方式是什么?池,同时还附加到迭代器?



使用带有进程池的队列(即(似乎是一种常见的模式

Pool(2).map(f, xs)

但是其中f的主体可以附加到被映射的项目,例如

from multiprocessing import Pool
xs = [0]
def f(n):
global xs
if n < 10:
xs.append(n + 1)
return n
Pool(2).map(f, xs)

期望返回[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

我意识到使用mt提供的原语"手动"构建这个模式是可能的,但这似乎是一个足够常见的模式,必须有一个通用的解决方案。你知道一个吗?

根据@martineau的建议,您的代码可以更新为:

import multiprocessing as mp

def f(n, xs, xn):
if n < 10:
xn.append(n)
xs.append(n + 1)
xn.append(n)
xs.append(n + 2)

if __name__ == '__main__':
with mp.Manager() as manager:
xs = manager.list()
xn = manager.list()
with mp.Pool(processes=2) as pool:
pool.starmap(f, [(n, xs, xn) for n in range(20)])
print(xn)
print(xs)

这将打印

[3, 0, 3, 0, 4, 1, 4, 1, 5, 2, 5, 2, 6, 6, 7, 9, 7, 9, 8, 8]
[4, 1, 5, 2, 5, 2, 6, 3, 6, 3, 7, 4, 7, 8, 8, 10, 9, 11, 9, 10]

对于这种情况,您无法保证n个值的生成顺序得到保留。

编辑:

import multiprocessing as mp

def f(n):
thresh = 10
if max(xs) <= thresh and n < thresh:
xs.append(n + 1)

if __name__ == '__main__':
with mp.Manager() as manager:
xs = manager.list([0])
with mp.Pool(processes=2) as pool:
pool.map(f, range(20))
print(sorted(xs))

这个打印

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

您可以创建一个类,该类可以从以下原语中执行此操作:

from multiprocessing import JoinableQueue, Process

class PoolQueue(object):
def __init__(self, n):
self.num_procs = n
def map(self, f, args):
payloads = JoinableQueue()
procs = []
def add_task(arg):
payloads.put(arg)
def process_task():
while True:
pl = payloads.get()
f(pl, add_task)
payloads.task_done()
for arg in args:
add_task(arg)
procs = [Process(target=process_task) for _ in range(self.num_procs)]
for p in procs:
p.start()
payloads.join()
for p in procs:
p.kill()

要测试它,请运行--

from time import sleep
from random import random

def pause():
sleep(random() / 100)

def process(payload, add_task):
print(payload)
pause()
if payload:
add_task(payload[:-1])
return payload

if __name__ == '__main__':
for x in range(1):
PoolQueue(2).map(
process,
[
'abcdefghij',
'0123456789',
'!@#$%^&*()',
],
)

这里的一个问题是,它死锁了队列增长超过32767个任务。gevent.queue.JoinableQueue可以更好地处理这个问题,但这超出了这个问题的范围。

相关内容

  • 没有找到相关文章

最新更新