我有一个大型数据集,我希望我的脚本对其进行迭代,对每个条目执行一系列操作,然后将结果安排到HDD存储。由于数据集可能相对较大 (~250 GB(,RAM 可用性要求数据集一次以 1000 个条目的块(我在下面的代码中称为 dataBlock(进行处理。我还使用multiprocessing.Pool
类来简化使用多个 CPU 内核来完成此任务。
我基本上已经安排好了事情,以便将每个数据块传递到池,池使用imap
方法对数据块执行所需的计算,池返回计算结果,数据块的结果附加到列表中。这个列表(processed_data
(是计算集合的期望最终产品。
processed_data = []
multiprocessing.Pool(processor_cap) as pool:
for blockIndex, block in enumerate(range(1000, height-remainder, 1000)):
#Read-in 1000 spectra from source dataset
dataBlock = np.asarray(raw_dset[blockIndex*1000:block][:])
'''
Pass data block to processor pool, which iterates through data
block. Each spectrum is handed off to a CPU in the pool,
which centroids it and appends the result to "processed_block".
'''
processed_block = pool.imap(centroid_spectrum, dataBlock)
#Append processed spectra to processed data bin
for idx, processed_spectrum in enumerate(processed_block):
processed_data.append(processed_spectrum)
我想知道的是如何在调用pool.imap()
后使脚本暂停,直到在不关闭池的情况下返回完整的processed_block
。目前,它直接进入for
循环,该循环紧随上面的截图,而无需等待pool.imap
返回processed_block
。我尝试在pool.imap()
调用后立即调用pool.join()
,但它只返回***AssertionError
并再次继续其下方的for
循环。我最终可以成功调用pool.close()
,并在脚本中稍后将所有 dataBlock 馈送到池中后pool.join()
,就在上面最外层for
循环的末尾下方。
提前感谢您的帮助!
如果不付出很多努力来改变事情,很难处理您的示例; 但是如果你有一个来自 imap(( 调用的迭代器,那么你可以考虑在到达 for 循环之前将迭代器的元素解析为列表:
processed_block = pool.imap(centroid_spectrum, dataBlock)
processed_block = [ x for x in processed_block ] # convert from an iterator to a list
for idx, processed_spectrum in enumerate(processed_block):
等。
这是否达到了您想要的效果?
我只是将Pool.imap()
调用更改为Pool.map()
调用,脚本按预期运行。请参阅我与Mikhail Burshteyn的交流以获取更多信息。