当一组任务依赖于另一组任务时,我如何同时完成 python 中的两组任务?



我有大量的小文件要从 s3 下载和处理。

下载速度相当快,因为每个文件只有几兆字节。它们加起来大约是100GB。处理所需的时间大约是下载时间的两倍,并且纯粹是 CPU 密集型的。因此,通过在下载其他文件的同时在多个线程中完成处理,应该可以缩短整体运行时间。

目前,我正在下载一个文件,对其进行处理并移动下一个文件。在python中是否有一种方法可以让我一个接一个地下载所有文件并在完成下载后立即处理每个文件?这里的主要区别在于,当每个文件都在处理时,另一个文件总是在下载。

我的代码如下所示:

files = {'txt': ['filepath1', 'filepath2', ...], 
'tsv': ['filepath1', 'filepath2', ...]
} 
for kind in files.keys():
subprocess.check_call(f'mkdir -p {kind}', shell=True)
subprocess.call(f'mkdir -p {kind}/normalized', shell=True)
for i, file in enumerate(files[kind]):
subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
f = file.split('/')[-1]
subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)

我还编写了一个多处理解决方案,我可以同时下载和处理多个文件,但这并没有导致速度的提高,因为网络速度已经饱和。瓶颈在于处理。我已经把它包括在内,以防它对你们有帮助。

from contextlib import closing
from os import cpu_count
from multiprocessing import Pool
def download_and_proc(file, kind='txt'):
subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
f = file.split('/')[-1]
subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)
with closing(Pool(processes=cpu_count()*2)) as pool:
pool.map(download_and_proc, files)

从长远来看,您当前的多处理代码应该非常接近最佳状态。它不会总是以最大速度下载,因为负责下载文件的相同执行线程将等到文件被处理后再下载另一个文件。但它通常应该在处理中消耗所有 CPU,即使某些网络容量未使用。如果您也尝试始终下载,则最终会耗尽要下载的文件,并且网络将空闲相同的时间,只是在批处理作业结束时全部运行。

一个可能的例外是,如果处理文件所花费的时间始终完全相同。然后,您可能会发现您的工作线程在同步运行,它们都同时下载,然后所有工作线程同时进行,即使工作线程数多于可供它们运行的 CPU。除非处理以某种方式与实时时钟相关联,否则这似乎不太可能发生很长时间。大多数情况下,您会先于其他流程完成一些流程,因此下载最终会错开。

因此,改进您的代码不太可能给您带来太多加速。如果您认为需要它,您可以将下载和处理拆分为两个单独的池。甚至可以在主进程中将其中一个作为单进程循环执行,但我将在此处显示完整的双池版本:

def download_worker(file, kind='txt'):
subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
return file
def processing_worker(file, kind='txt')
f = file.split('/')[-1]
subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)
with Pool() as download_pool, Pool() as processing_pool:
downloaded_iterator = download_pool.imap(download_worker, files)  # imap returns an iterator
processing_pool.map(processing_worker, downloaded_iterator)

这应该与您的系统一样快地下载和处理。如果下载文件花费的时间少于其处理时间,那么很可能第一个池将在第二个池之前完成,代码可以很好地处理。如果处理不是瓶颈,它也将支持它(第二个池在某些时候会处于空闲状态,等待文件完成下载)。

最新更新