我有一个使用多处理库来计算一些东西的程序。大约有 10K 个输入需要计算,每个输入需要 0.2 秒到 10 秒。
我目前的方法使用池:
# Inputs
signals = [list(s) for s in itertools.combinations_with_replacement(possible_inputs, 3)]
# Compute
with mp.Pool(processes = N) as p:
p.starmap(compute_solutions, [(s, t0, tf, folder) for s in signals])
print (" | Computation done.")
我注意到在要检查的 300/400 最后一个输入上,程序变得慢了很多。我的问题是:Pool
和starmap()
如何表现?
根据我的观察,我相信如果我得到 10K 输入和N = 4
(4 个进程),那么 2 500 个第一个输入分配给第一个进程,第二个进程旁边的 2 500 个,...每个进程都以串行方式处理其输入。 这意味着,如果某些进程在其他进程之前清除了队列,则它们不会执行新任务。
这是对的吗?
如果这是正确的,我怎么能有一个更智能的系统,可以用这个伪代码来表示:
workers = Initialize N workers
tasks = A list of the tasks to perform
for task in tasks:
if a worker is free:
submit task to this worker
else:
wait
感谢您的帮助:)
注意:不同的地图功能有什么区别。我相信map()
、imap_unordered()
、imap
、starmap
存在。
它们之间有什么区别,我们什么时候应该使用其中一个?
,如果某些进程在其他进程之前清除了队列,则它们不会执行新任务。
这是对的吗?
不。multiprocess.Pool()
的主要目的是将传递的工作负载分散到其工作线程池中 - 这就是为什么它带有所有这些映射选项 - 其各种方法之间的唯一区别在于工作负载的实际分布方式以及如何收集结果回报。
在您的情况下,使用[(s, t0, tf, folder) for s in signals]
生成的可迭代对象将使其每个元素(最终取决于signals
大小)发送到池中的下一个自由工作线程(调用为compute_solutions(s, t0, tf, folder)
),一次一个(如果传递chunksize
参数,则更多),直到整个可迭代对象用尽。您无法控制哪个工人执行哪个部分。
工作负载也可能不均匀分布 - 一个工作线程可能比另一个工作线程处理更多的条目,具体取决于资源使用情况、执行速度、各种内部事件......
然而,使用map
、imap
和starmap
multiprocessing.Pool
方法,你会得到均匀有序传播的错觉,因为它们在内部同步来自每个工作线程的返回以匹配源可迭代对象(即结果的第一个元素将包含被调用函数与可迭代对象的第一个元素的结果返回)。如果你想看看下面实际发生了什么,你可以尝试这些方法的异步/无序版本。
因此,默认情况下,您将获得更智能的系统,但如果您想完全控制您的工作人员池,您可以随时使用multiprocessing.Pool.apply_async()
。
作为旁注,如果您正在寻找优化对可迭代对象本身的访问(因为池映射选项将消耗其中的很大一部分),您可以查看此答案。
最后
它们之间有什么区别,我们什么时候应该使用其中一个?
与其我在这里引用,不如转到官方文档,因为对它们之间的差异有很好的解释。