asyncio包在从磁盘读取下一组数据时对一组数据进行计算,这是一个用例吗



我有一个处理数百个数据文件的计算管道。目前,它通过让多个进程同时处理自己的文件(使用snakemake(来并行化任务。

我正在重写管道以在GPU上进行计算,这应该是一个显著的加速,但现在并行化是在单个文件的计算中进行的,所以它们是按顺序处理的。

我想,当当前文件在GPU上处理时,我可能可以通过从磁盘读取下一个要处理的文件来节省一些时间,然后在下一次迭代计算时将上一次迭代的结果写入磁盘。

这似乎是asyncio的设计初衷,但我对它太熟悉了。这是我应该走的路吗?还是使用线程包或其他方法实现更好和/或更容易?

是的,我认为你是对的。但是要注意使用asyncio读取/写入文件,有一个陷阱。

由于从技术上讲,文件读取是一种I/O操作,异步应该会带来一些性能提升。它会的,但首先要做的是。

处理文件时的性能提升

很长一段时间以来,python中的文件缺乏良好的aio库。即使是现在,非作弊选项也只能用于linux操作系统(caio库必须可用(。请注意,您的操作系统应该具有对异步文件操作的本机支持。第一次尝试使用文件的python异步库是aiofiles。这个图书馆有点作弊。您可以自己深入研究,但长话短说,它使用线程来模拟读/写任务的并行化。由于python GIL,这不会导致任何性能提升(事实上,由于线程管理的额外开销,性能会下降(。

然后,第二个为文件公开良好异步接口的库是aiofile——请注意,它的名称非常相似,非常令人困惑。由于2.0.0,此库使用caio,因此使用本机linux支持异步文件操作。你应该坚持使用这个库来获得性能。

设计

回到最初的问题,你有一些选择的自由来实现它。最通用的解决方案是使用asyncio.gather来获得协同增益。

如果您将内容生产者和消费者分开,那么您可以将Nth生产者与N+1 th消费者合作:

async def get_content(...) -> Content:
# ...
# return Content(...)

async def process_content(content: Content, ...) -> ProcessedContent:
# ..., here is Your GPU delegation
# return ProcessedContent(...)

async def run_chunk(first_content: Content, ...) -> tuple[ProcessedContent, Content]:
# ...
first_content_processor_coro = process_content(first_content, ...)
second_content_coro = get_content(...)
second_content, first_process_result = await asyncio.gather(second_content_coro, first_content_processor_coro)
return first_process_result, second_content

run_chunk在阅读下一个文件时应该做与您在提问过程中描述的完全相同的事情。然后,您可以自己安排对run_chunk的调用,但它是按顺序运行的。

然而,为了获得大多数性能,我只需在生产者-消费者身上进行编排,然后在collect中运行整个批次:

async def get_and_process(...):
content = await get_content(...)
return await process_content(content)
async def run_batch(...):
processed_results = await asyncio.gather(*[get_and_process(...) for ... in X])

最新更新