我有一个大列表。我想处理每一项。我想对列表进行分段,并在不同的CPU上处理每个分段。我正在使用pathos多处理库。我创建了以下功能:
def map_list_in_segments (l, f):
cpus = max(1, int(cpu_count() / 2) - 1)
seg_length = int(len(l) / cpus)
segments = [l[x:x+seg_length] for x in range(0,len(l),seg_length)]
pool = Pool(nodes=cpus)
mapped_segments = pool.map(lambda seg: f(seg), segments)
return (sg for seg in mapped_segments for sg in seg)
它返回正确的结果并使用所有(或几乎所有(CPU。然而,对返回的列表进行迭代会导致意外消耗大量内存。
起初,我是在返回一份理解列表。我把它换成了一个生成器,希望能减少内存消耗,但这并没有改善任何情况。
基于评论的更新:
我不知道imap
和uimap
,也不知道它们会自动对输入列表进行分块。我尝试了uimap
,但发现CPU利用率很低,运行时间很长。其中一个进程的CPU利用率非常高。我认为正在发生的是,有很多酸洗正在进行。我传递的f
在一个闭包中有一个大对象。当使用ProcessingPool方法(map
、imap
、uimap
(时,需要为列表中的每个元素对该对象进行pickle。我怀疑这就是一个非常繁忙的过程正在做的事情。其他工艺都被这种酸洗所抑制。
如果是这样的话,这就解释了为什么我的手动分段会显著提高CPU利用率:大对象只需要在每个分段中腌制一次,而不是为每个项目腌制一次。
然后,我尝试在我的map_list_in_segments
中使用uimap
,希望能降低内存消耗,但这并没有发生。以下是调用方法并迭代结果的代码的外观:
segments = multiprocessing.map_list_in_segments(l, lambda seg: process_segment(seg, large_object_needed_for_processing))
for seg in segments:
for item in seg:
# do something with item
我对生成器的(有限的(理解是,循环通过段的第一个for
循环应该在迭代时从内存中释放每一个。如果是这样的话,那么大的内存使用似乎是对process_segment
方法的返回值的酸洗。我不会返回大量数据(每个项目大约1K字节(,我正在处理的l
的大小是6000个项目。不确定为什么会消耗5GB的内存。
multiprocessing
的问题是进程之间的通信成本很高。如果你的结果在大小上与你的输入相当,你可能会把大部分时间花在整理和取消整理数据上,而不是做任何有用的事情。这取决于f
的价格,但您最好不要在这里使用multiprocessing
。
一些进一步的测试表明酸洗不是问题所在。我在for item in seg
中所做的处理是构造消耗大量内存的附加对象。
从这次练习中得出的见解和聪明的评论者:
- ProcessPool方法(
map
、imap
、uimap
(会自动对列表进行分块 - 如果您将一个大对象传递给
f
(通过闭包(,您可能会发现手动分块列表(如上所述(可以节省大量的酸洗时间,并提高CPU利用率 - 使用CCD_ 21和CCD_