嗨,我正试图将此处理发送到不同的核心,因为它们都是彼此独立的,但是它们都没有等待,所以任务永远不会运行。我以为这就是未来的意义?
async def process_object(filename):
# await 1 - download file from S3
# await 2 - parse XML file
if "__main__" == __name__:
objects = get_objects(
bucket_name=bucket_name, prefix=prefix, file_extension=".xml", top_n=top_n
)
futures = []
with concurrent.futures.ProcessPoolExecutor(
multiprocessing.cpu_count()
) as executor:
futures = [executor.submit(process_object, filename) for filename in objects]
concurrent.futures.wait(futures)
如果要向ProcessPoolExecutor
提交任务,则不需要使用asyncio
。这些任务将在另一个进程中执行,因此它们已经在不使用asyncio的情况下并发运行。你的process_object
函数永远不会在你当前的代码中运行,因为协程在执行之前必须是awaited
。
也就是说,您想要这样的内容:
def process_object(filename):
# download file
# parse file
...
if "__main__" == __name__:
objects = get_objects(
bucket_name=bucket_name, prefix=prefix, file_extension=".xml", top_n=top_n
)
futures = []
with concurrent.futures.ProcessPoolExecutor(
multiprocessing.cpu_count()
) as executor:
futures = [executor.submit(process_object, filename) for filename in objects]
concurrent.futures.wait(futures)