多处理器和慢速文件系统



假设我们有以下脚本(read_file.py(,它读取一个文件并将前200000行写入另一个文件。

import pandas as pd
import sys
a = sys.argv[1]
b = sys.argv[2]
df = pd.read_csv(a, header=0, sep="t").head(200000).to_csv(b, header=True, index=False)

让我们有第二个脚本(test-latency.py(,它用多处理(对两个文件(调用第一个脚本。然后读取生成的两个文件并将它们合并。

import pandas as pd
import multiprocessing as mp
import sys
import subprocess
import time
a = sys.argv[1]
b = sys.argv[2]
l = [a, b]
pool = mp.Pool(processes = (mp.cpu_count() - 1))
for filename in l:
f_in = filename
f_out = filename + "out.tsv"
cmd = ['python', 'read_file.py', f_in, f_out]
pool.apply_async(subprocess.Popen, (cmd,))
pool.close()
pool.join()
time.sleep(1)
df1 = pd.read_csv(a + "out.tsv")
df2 = pd.read_csv(b + "out.tsv")
df = pd.merge(df1, df2, on="Name").to_csv("test.tsv", sep="t", header=0)

问题是,根据文件系统的速度(由于NFS缓存(,文件可能在pool.join((之后不存在。这可以通过延迟到文件出现的time.sleep(1)来解决。但这并不是一个最佳的解决方案,因为对于较慢的文件系统,它可能会导致FileNotFoundError: [Errno 2]。一种解决方案是提供一个通用的延迟等待选项,但我认为让用户参与这样的决策并不明智。你对这个问题有什么建议?

文件系统缓存不是你的问题。您正在使用multiprocessing.Pool创建子流程,而这些子流程中的每一个都是本身,从而生成具有subprocess.Popen的子流程。问题是subprocess.Popen只是产生了这个过程,但没有等待完成。因此,即使在multiprocessing.Pool的子进程全部完成之后,这些孙进程可能仍在运行。

一个简单的解决方案是自己管理子流程:

# Store subprocess handles (Popen objects).
subprocesses = []
# Launch subprocesses in the background.
for filename in l:
f_in = filename
f_out = filename + "out.tsv"
proc = subprocess.Popen(['python', 'read_file.py', f_in, f_out])
subprocesses.append(proc)
# Wait for each subprocess to finish.
for proc in subprocesses:
if proc.wait() != 0:
# Error occurred, handle it however you want
raise RuntimeError('Subprocess failed with nonzero exit code')

还有一个问题是,并行执行是否有用,因为I/O(网络或磁盘(可能是瓶颈。但这是你可以自己测试的。

最新更新