Python 多处理池;等待迭代完成



我有一个大型数据集,我希望我的脚本对其进行迭代,对每个条目执行一系列操作,然后将结果安排到HDD存储。由于数据集可能相对较大 (~250 GB(,RAM 可用性要求数据集一次以 1000 个条目的块(我在下面的代码中称为 dataBlock(进行处理。我还使用multiprocessing.Pool类来简化使用多个 CPU 内核来完成此任务。

我基本上已经安排好了事情,以便将每个数据块传递到池,池使用imap方法对数据块执行所需的计算,池返回计算结果,数据块的结果附加到列表中。这个列表(processed_data(是计算集合的期望最终产品。

processed_data = []
multiprocessing.Pool(processor_cap) as pool:
for blockIndex, block in enumerate(range(1000, height-remainder, 1000)):
#Read-in 1000 spectra from source dataset
dataBlock = np.asarray(raw_dset[blockIndex*1000:block][:])
'''
Pass data block to processor pool, which iterates through data
block. Each spectrum is handed off to a CPU in the pool,
which centroids it and appends the result to "processed_block".
'''
processed_block = pool.imap(centroid_spectrum, dataBlock)
#Append processed spectra to processed data bin
for idx, processed_spectrum in enumerate(processed_block):
processed_data.append(processed_spectrum)

我想知道的是如何在调用pool.imap()后使脚本暂停,直到在不关闭池的情况下返回完整的processed_block。目前,它直接进入for循环,该循环紧随上面的截图,而无需等待pool.imap返回processed_block。我尝试在pool.imap()调用后立即调用pool.join(),但它只返回***AssertionError并再次继续其下方的for循环。我最终可以成功调用pool.close(),并在脚本中稍后将所有 dataBlock 馈送到池中后pool.join(),就在上面最外层for循环的末尾下方。

提前感谢您的帮助!

如果不付出很多努力来改变事情,很难处理您的示例; 但是如果你有一个来自 imap(( 调用的迭代器,那么你可以考虑在到达 for 循环之前将迭代器的元素解析为列表:

processed_block = pool.imap(centroid_spectrum, dataBlock)
processed_block = [ x for x in processed_block ] # convert from an iterator to a list
for idx, processed_spectrum in enumerate(processed_block):

等。

这是否达到了您想要的效果?

我只是将Pool.imap()调用更改为Pool.map()调用,脚本按预期运行。请参阅我与Mikhail Burshteyn的交流以获取更多信息。

相关内容

  • 没有找到相关文章

最新更新