例如,将 17 个任务分配给 2 个进程,每块 6 个将是 2*6+5,也就是说每个进程得到 6 个,但剩余 5 个任务。我认为最好的方法是将剩余的任务均匀分布在 2 个过程中。所以我做了一个测试,看看mp.Pool
是否这样做。这是一个测试脚本
import multiprocessing as mp
import time
def f(x):
time.sleep(0.1)
print(mp.current_process())
if __name__ == "__main__":
p = mp.Pool(2)
p.map(f, range(17),6)
将输出
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-2, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-2, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-2, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-2, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-2, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-2, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
<ForkProcess(ForkPoolWorker-1, started daemon)>
注意最后 5 行,很明显剩余的任务都分发到一个进程,那么其他所有进程都无关紧要。
有没有一种好方法可以在Pool.map
中自动分配剩余的块?
不是简单的参数变化。而且,当流程已分配给工作人员时,您也不会重新分配流程。
当然,您可以编写一个解决方法:
if __name__ == "__main__":
p = mp.Pool(2)
tasks = 17
chunksize = 6
p.map(f, range((tasks / chunksize) * chunksize), 6)
p.map(f, range((tasks / chunksize) * chunksize + 1, tasks))
这将首先完成完整的块。然后,其余五个任务将逐个分配给池,而不是分块分配。现在,这将产生所需的结果。
不过,我不确定为什么这很重要。如果任务需要很长时间才能完成,那么使用块有什么意义?不会有任何性能提升,因为时间会花在工人身上。使用默认块大小 1 绝对可以,只要有空就将任务分配给工作人员。
如果任务在极短的时间内完成,并且将花费更多时间将数据逐个传输给工作人员,则块大小可以提高性能。如果是这种情况,那么最后的任务是否仅由一个工人完成并不重要,因为完成这几个任务的时间是微不足道的。
这个特殊的例子当然是最坏的情况。在更现实的场景中,通常不会有任何性能损失。想象一下,以 35 个块的形式发送给工人的数万个任务。不可避免地,在批处理过程中,工人会发散,他们不会完全相同地完成。有些块会更快,有些会更慢。现在让我们假设你的余数不是 34 而是 17。 如果工作线程 A 现在已启动最后一个完整块,并且在工作线程 B 完成其最后一个完整块并获得剩余的 17 个任务时处理了不到一半的任务,则尽管工作线程 B 实际上在工作线程 A 之前完成,尽管已经处理了剩余的集合。
但是,如果有理由先做块,其余的分配给工人,那么这就是这样做的方法。不过,我确实认为您正在尝试解决一个不是问题的问题,并且为了难以衡量的性能提升,您只会增加代码的复杂性。