现在我有一些代码如下:
userinput1 = abc.....
userinput2 = abc.....
userinput3 = abc.....
async def task1():
do something with userinput1...
do another thing...
async def task2():
do something with userinput2...
do another thing...
async def task3():
do something with userinput3...
do another thing...
async def main():
await asyncio.wait([task1() , task2(), task3()])
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
正如您在上面看到的,我有3个异步函数,它们同时做不同的事情。我想知道是否有任何方法可以根据用户输入轻松创建许多功能?本质上,我想让它能够做到这一点:
userinput1 = abc.....
userinput2 = abc.....
userinput3 = abc.....
userinput4 = abc.....
amount_of_needed_functions = 4
一旦它得到了这些数据,它就会像这样运行:
async def task1():
do something with userinput1...
do another thing...
async def task2():
do something with userinput2...
do another thing...
async def task3():
do something with userinput3...
do another thing...
async def task4():
do something with userinput4...
do another thing...
async def main():
await asyncio.wait([task1() , task2(), task3(), task4()])
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
因此,它几乎会使函数基于某些可验证性(如userinput1(,然后无论指定多少次都这样做(amount_of_eneeded_functions(,然后同时运行所有这些。抱歉,这是一个有点令人困惑的问题,但我很不知道从哪里开始研究这个问题。谢谢
将用户输入作为参数传递给每个任务:
多任务的单一功能
import asyncio
async def function(user_input, input_index):
print(f'In {input_index} function: {user_input}')
async def main():
tasks = []
for input_index in range(1, 4):
user_input = input(f"Enter input #{input_index}n")
tasks.append(asyncio.create_task(function(user_input, input_index)))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
多任务的多功能
使用字典为每个输入调用所需的方法。
import asyncio
async def function1(user_input, input_index):
print(f'In {input_index} function1: {user_input}')
async def function2(user_input, input_index):
print(f'In {input_index} function2: {user_input}')
async def function3(user_input, input_index):
print(f'In {input_index} function3: {user_input}')
FUNCTION_DICTIONARY = { 1 : function1, 2 : function2, 3 : function3 }
async def main():
tasks = []
for input_index in range(1, 4):
user_input = input(f"Enter input #{input_index}n")
tasks.append(asyncio.create_task(FUNCTION_DICTIONARY[input_index](user_input, input_index)))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
如果所有函数都做相同的事情,则可以尝试这样的操作
inputs = ['a', 'b', 'c']
async def task(input: str):
# Do stuff / await stuff
return input
async def main()
await asyncio.wait(
[task(arg) for arg in inputs]
)