我正在尝试以下代码:
from multiprocessing import Pool
def f(x):
return x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))
据我了解,5 个处理器可以0, 1, 2, 3, 4
执行操作。如果处理器 1 完成了它的工作,它是在其余处理器忙于1,2,3,4
时立即5
,还是所有处理器的代码完成,以便下一批将一起5, 6, 7, 8, 9
,依此类推。如果后者发生,我如何实现上述代码,以便在处理器空闲时获得分配给它的新作业?
如何测试实现?
线程池会立即生成一个新线程(添加到您的示例中)。请注意线程 4 花费的时间足够长,以至于第 12 个任务能够启动。
PS我刚刚注意到你忘记了10。
from multiprocessing import Pool
import time
import random
def f(x):
print "Enter %s" % x
time.sleep( random.randrange(1,100,1)/10.0 )
print "Exit %s" % x
return x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))
Enter 0
Enter 1
Enter 2
Enter 3
Enter 4
Exit 0
Enter 5
Exit 3
Enter 6
Exit 2
Enter 7
Exit 5
Enter 8
Exit 1
Enter 9
Exit 6
Enter 11
Exit 11
Enter 12
Exit 4
Enter 13
Exit 7
Exit 12
Exit 9
Exit 8
Exit 13
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]
是的,这种情况是可能的。首先将输入分区为单独的任务。当任务大小不等(就处理时间而言)并且任务太少而无法填补空白时,就会出现问题。
从文档中:
map(func,iterable[,chunksize])
此方法将可迭代对象切成多个块,并将其作为单独的任务提交到进程池。(近似)尺寸 可以通过将块大小设置为正数来指定这些块 整数。
例
为了说明此行为,我更改了f(x)
,因此需要x
秒才能完成。
from multiprocessing import Pool
import time
import threading
def f(x):
print('x: ' + str(x) + 'tThread ID: ' + str(threading.get_ident()))
time.sleep(x)
if __name__ == '__main__':
chunksize = 3
with Pool(2) as p:
p.map(f, [10, 1, 1, 1, 1, 1], chunksize)
输入数组[10, 1, 1, 1, 1, 1]
分为len(arr) / chunksize = 2
组:
[10, 1, 1] # For thread 1, takes 12 seconds to finish
[ 1, 1, 1] # For thread 2, takes 3 seconds to finish
因此,线程2 将在 3 秒后完成,而线程 1 将继续工作 9 秒。
示例输出:
x: 10 Thread ID: 8556
x: 1 Thread ID: 59180
x: 1 Thread ID: 59180
x: 1 Thread ID: 59180
x: 1 Thread ID: 8556
x: 1 Thread ID: 8556
如果您发现自己处于这种情况,那么您可以强制较小的chunksize
。值 1 可确保工作负载尽可能平衡,但代价是开销更高。