Python异步,aiohttp:动态更新任务循环应该工作到特定的条件



我有一个使用api的简单脚本。我创建number_of_user用户。我遍历循环,创建随机用户create_random_user(),为每个用户创建任务并将任务附加到循环中。用户创建任务async def fetch_user_create在收到响应后,在async def fetch_user_log中为用户日志创建另一个任务,并将其添加到所有任务中。我的问题是:我怎么能等到len(tasks_user) == 2 * number_of_user呢。

我试着放置await asyncio.sleep(1(-这是有效的,但取决于number_of_user。

目标:等待条件有可能吗?或者我做错了什么?

async def fetch_user_log(session, data_user):
async with session.post(url_user_login, data=data_user) as response:
async def fetch_user_create(session, data_user, tasks_user_create):
async with session.post(url_user_create, data=data_user) as response:
task2 = asyncio.ensure_future(fetch_user_log(session, data_user))
tasks_user.append(task2)
#await asyncio.sleep(1)
await response.read()

#await asyncio.gather(*tasks_user) - tried
async def run():
tasks_user = []
async with ClientSession() as session:
for i in range(number_of_user):
data_user = create_random_user()
task = asyncio.ensure_future(fetch_user_create(session, data_user, tasks_user_create))
tasks_user.append(task)
await asyncio.wait(tasks_user)
#await asyncio.gather(*tasks_user) - tried
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)

等待条件可以用asyncio实现。条件:

import asyncio
async def test_wait_for(cond, tasks):
print(".")
async with cond:
await cond.wait_for(lambda: (len(tasks)>3))
print("!")
async def test_add_task(cond, tasks):
for i in range(6):
print(i)
await asyncio.sleep(1)
async with cond:
tasks.append(i)
cond.notify_all()
async def run():
cond = asyncio.Condition()
tasks = []
asyncio.ensure_future(test_wait_for(cond, tasks))
await test_add_task(cond, tasks)


loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)

条件为事件+锁定。与notify_all一起发送的事件解锁所有waitwait_for协同程序。在此处锁定也会锁定任务数组。

它也可以用asyncio来实现。事件,如果您将条件移动到发送通知的任务。

最新更新