如何在Python中将参数传递给多个异步任务



现在我有一些代码如下:

userinput1 = abc.....
userinput2 = abc.....
userinput3 = abc.....

async def task1():
do something with userinput1...
do another thing...

async def task2():
do something with userinput2...
do another thing...

async def task3():
do something with userinput3...
do another thing...

async def main():
await asyncio.wait([task1() , task2(), task3()])

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

正如您在上面看到的,我有3个异步函数,它们同时做不同的事情。我想知道是否有任何方法可以根据用户输入轻松创建许多功能?本质上,我想让它能够做到这一点:

userinput1 = abc.....
userinput2 = abc.....
userinput3 = abc.....
userinput4 = abc.....
amount_of_needed_functions = 4

一旦它得到了这些数据,它就会像这样运行:

async def task1():
do something with userinput1...
do another thing...

async def task2():
do something with userinput2...
do another thing...

async def task3():
do something with userinput3...
do another thing...

async def task4():
do something with userinput4...
do another thing...

async def main():
await asyncio.wait([task1() , task2(), task3(), task4()])

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

因此,它几乎会使函数基于某些可验证性(如userinput1(,然后无论指定多少次都这样做(amount_of_eneeded_functions(,然后同时运行所有这些。抱歉,这是一个有点令人困惑的问题,但我很不知道从哪里开始研究这个问题。谢谢

将用户输入作为参数传递给每个任务:

多任务的单一功能

import asyncio
async def function(user_input, input_index):
print(f'In {input_index} function: {user_input}')

async def main():
tasks = []
for input_index in range(1, 4):
user_input = input(f"Enter input #{input_index}n")
tasks.append(asyncio.create_task(function(user_input, input_index)))
await asyncio.gather(*tasks)

if __name__ == '__main__':
asyncio.run(main())

多任务的多功能

使用字典为每个输入调用所需的方法。

import asyncio
async def function1(user_input, input_index):
print(f'In {input_index} function1: {user_input}')
async def function2(user_input, input_index):
print(f'In {input_index} function2: {user_input}')
async def function3(user_input, input_index):
print(f'In {input_index} function3: {user_input}')

FUNCTION_DICTIONARY = { 1 : function1, 2 : function2, 3 : function3 }
async def main():
tasks = []
for input_index in range(1, 4):
user_input = input(f"Enter input #{input_index}n")
tasks.append(asyncio.create_task(FUNCTION_DICTIONARY[input_index](user_input, input_index)))
await asyncio.gather(*tasks)

if __name__ == '__main__':
asyncio.run(main())

如果所有函数都做相同的事情,则可以尝试这样的操作


inputs = ['a', 'b', 'c']
async def task(input: str):
# Do stuff / await stuff
return input

async def main()
await asyncio.wait(
[task(arg) for arg in inputs]
)

最新更新