'多处理.Pool.map()' 似乎调度错误



>我有一个函数,可以请求服务器,检索一些数据,处理它并保存一个csv文件。此功能应启动 20k 次。每次执行的持续时间不同:有时持续时间超过 20 分钟,有时不到一秒。我决定使用multiprocessing.Pool.map并行执行。我的代码如下所示:

def get_data_and_process_it(filename):
print('getting', filename)
...
print(filename, 'has been process')
with Pool(8) as p:
p.map(get_data_and_process_it, long_list_of_filenames)

查看prints是如何生成的,似乎long_list_of_filenames它被分成 8 个部分并分配给每个部分CPU因为有时只是在一个 20 分钟的执行中被阻止,而在这 20 分钟内没有处理其他long_list_of_filenames元素。我期望map以FIFO风格调度CPU内核中的每个元素。

对于我的情况,有更好的方法吗?

map方法仅在所有操作完成后返回。

从泳池工人打印并不理想。首先,像stdout这样的文件使用缓冲,因此在打印消息和实际显示之间可能存在可变的时间量。此外,由于所有工人都继承相同的stdout,他们的输出将变得相互交织,甚至可能是乱码。

所以我建议改用imap_unordered。这将返回一个迭代器,该迭代器将在结果可用时立即开始生成结果。唯一的问题是,这按它们完成的顺序返回结果,而不是按它们开始的顺序返回结果。

你的工作线程函数(get_data_and_process_it)应该返回某种状态指示器。例如,文件名和结果的元组。

def get_data_and_process_it(filename):
...
if (error):
return (filename, f'has *failed* bacause of {reason}')
return (filename, 'has been processed')

然后,您可以执行以下操作:

with Pool(8) as p:
for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
print(fn, res)

这提供了有关作业何时完成的准确信息,并且由于只有父进程写入stdout,因此输出没有乱码的变化。

此外,我建议在程序开始时的某个地方使用sys.stdout.reconfigure(line_buffering=True)。这可确保在每行输出后刷新stdout流。

map正在阻塞,而不是p.map您可以使用p.map_asyncmap将等待所有这些函数调用完成,以便我们连续看到所有结果。map_async以随机顺序执行工作,并且在开始新任务之前不会等待正在进行的任务完成。这是最快的方法。(欲了解更多信息)还有一个SO线程详细讨论了mapmap_async

多处理池类为我们处理排队逻辑。它非常适合并行运行网络抓取作业(示例)或任何可以独立分解和分发的工作。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看Queue类(更多信息)。

相关内容

  • 没有找到相关文章

最新更新