>我有一个函数,可以请求服务器,检索一些数据,处理它并保存一个csv文件。此功能应启动 20k 次。每次执行的持续时间不同:有时持续时间超过 20 分钟,有时不到一秒。我决定使用multiprocessing.Pool.map
并行执行。我的代码如下所示:
def get_data_and_process_it(filename):
print('getting', filename)
...
print(filename, 'has been process')
with Pool(8) as p:
p.map(get_data_and_process_it, long_list_of_filenames)
查看prints
是如何生成的,似乎long_list_of_filenames
它被分成 8 个部分并分配给每个部分CPU
因为有时只是在一个 20 分钟的执行中被阻止,而在这 20 分钟内没有处理其他long_list_of_filenames
元素。我期望map
以FIFO风格调度CPU内核中的每个元素。
对于我的情况,有更好的方法吗?
map
方法仅在所有操作完成后返回。
从泳池工人打印并不理想。首先,像stdout
这样的文件使用缓冲,因此在打印消息和实际显示之间可能存在可变的时间量。此外,由于所有工人都继承相同的stdout
,他们的输出将变得相互交织,甚至可能是乱码。
所以我建议改用imap_unordered
。这将返回一个迭代器,该迭代器将在结果可用时立即开始生成结果。唯一的问题是,这按它们完成的顺序返回结果,而不是按它们开始的顺序返回结果。
你的工作线程函数(get_data_and_process_it
)应该返回某种状态指示器。例如,文件名和结果的元组。
def get_data_and_process_it(filename):
...
if (error):
return (filename, f'has *failed* bacause of {reason}')
return (filename, 'has been processed')
然后,您可以执行以下操作:
with Pool(8) as p:
for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
print(fn, res)
这提供了有关作业何时完成的准确信息,并且由于只有父进程写入stdout
,因此输出没有乱码的变化。
此外,我建议在程序开始时的某个地方使用sys.stdout.reconfigure(line_buffering=True)
。这可确保在每行输出后刷新stdout
流。
map
正在阻塞,而不是p.map
您可以使用p.map_async
。map
将等待所有这些函数调用完成,以便我们连续看到所有结果。map_async
以随机顺序执行工作,并且在开始新任务之前不会等待正在进行的任务完成。这是最快的方法。(欲了解更多信息)还有一个SO线程详细讨论了map
和map_async
。
多处理池类为我们处理排队逻辑。它非常适合并行运行网络抓取作业(示例)或任何可以独立分解和分发的工作。如果您需要对队列进行更多控制或需要在多个进程之间共享数据,您可能需要查看Queue
类(更多信息)。