如何在集合中收集 wait()'d 协程?



我一直在尝试生成一个使用有限数量进程的ping扫描。我尝试了as_completed,但没有成功,然后用asyncio.FIRST_COMPLETED切换到asyncio.wait

如果有问题的行被注释掉,下面的完整脚本可以工作。我想把任务收集到一个集合,以摆脱pending = list(pending),但pending_set.union(task)抛出await wasn't used with future

"""Test simultaneous pings, limiting processes."""
import asyncio
from time import asctime
pinglist = [
'127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
'192.168.177.20', '192.168.177.100', '172.17.1.1'
]

async def ping(ip):
"""Run external ping."""
p = await asyncio.create_subprocess_exec(
'ping', '-n', '-c', '1', ip,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
return await p.wait()

async def run():
"""Run the test, uses some processes and will take a while."""
iplist = pinglist[:]
pending = []
pending_set = set()
tasks = {}
while len(pending) or len(iplist):
while len(pending) < 3 and len(iplist):
ip = iplist.pop()
print(f"{asctime()} adding {ip}")
task = asyncio.create_task(ping(ip))
tasks[task] = ip
pending.append(task)
pending_set.union(task)  # comment this line and no error
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
pending = list(pending)
for taskdone in done:
print(' '.join([
asctime(),
('BAD' if taskdone.result() else 'good'),
tasks[taskdone]
]))
if __name__ == '__main__':
asyncio.run(run())

pending_set.union(task)有两个问题:

  • union不会就地更新集合,它返回一个新集合,由原始集合和作为参数接收的集合组成。

  • 它接受一个可迭代的集合(比如另一个集合),而不是单个元素。因此,union试图迭代task,这是没有意义的。为了在yield from表达式中可用,任务对象在技术上是可迭代的,但是它们检测非异步上下文中的迭代尝试,并报告您观察到的错误。

要解决这两个问题,应该使用add方法,该方法通过副作用操作,并接受单个元素添加到集合中:

pending_set.add(task)

注意,在asyncio中限制并发的更惯用的方法是使用Semaphore。例如(未经测试):

async def run():
limit = asyncio.Semaphore(3)
async def wait_and_ping(ip):
async with limit:
print(f"{asctime()} adding {ip}")
result = await ping(ip)
print(asctime(), ip, ('BAD' if result else 'good'))
await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])

使用await asyncio.gather(*pending_set)

  • asyncio.gather()接受任意数量的可等待对象并返回一个
  • *解包集合
    >>> "{} {} {}".format(*set((1,2,3)))
    '1 2 3'
    

文档中的例子

await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)

我没有在原始应用程序中排队ping目标就解决了这个问题,这简化了事情。这个答案包括一个逐渐收到的目标列表和来自@user4815162342的有用指针。这就完成了对原来问题的回答。

import asyncio
import time
pinglist = ['127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
'192.168.177.20', '192.168.177.100', '172.17.1.1']
async def worker(queue):
limit = asyncio.Semaphore(4)    # restrict the rate of work
async def ping(ip):
"""Run external ping."""
async with limit:
print(f"{time.time():.2f} starting {ip}")
p = await asyncio.create_subprocess_exec(
'ping', '-n', '1', ip,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
return (ip, await p.wait())
async def get_assign():
return await queue.get()
assign = {asyncio.create_task(get_assign())}
pending = set()

维护两个不同的挂起集被证明是关键。一组是接收分配地址的单个任务。这个过程完成后,每次都需要重新启动。另一组用于ping消息,它运行一次,然后完成。

while len(assign) + len(pending) > 0:  # stop condition
done, pending = await asyncio.wait(
set().union(assign, pending),
return_when=asyncio.FIRST_COMPLETED
)
for job in done:
if job in assign:
if job.result() is None:
assign = set()  # for stop condition
else:
pending.add(asyncio.create_task(ping(job.result())))
assign = {asyncio.create_task(get_assign())}
else:
print(
f"{time.time():.2f} result {job.result()[0]}"
f" {['good', 'BAD'][job.result()[1]]}"
)

其余部分非常简单。

async def assign(queue):
"""Assign tasks as if these are arriving gradually."""
print(f"{time.time():.2f} start assigning")
for task in pinglist:
await queue.put(task)
await asyncio.sleep(0.1)
await queue.put(None)           # to stop nicely
async def main():
queue = asyncio.Queue()
await asyncio.gather(worker(queue), assign(queue))
if __name__ == '__main__':
asyncio.run(main())

这个的输出是(在我的网络上,172没有响应):

1631611141.70 start assigning
1631611141.70 starting 127.0.0.1
1631611141.71 result 127.0.0.1 good
1631611141.80 starting 192.168.1.10
1631611141.81 result 192.168.1.10 good
1631611141.91 starting 192.168.1.20
1631611142.02 starting 192.168.1.254
1631611142.03 result 192.168.1.254 good
1631611142.13 starting 192.168.177.20
1631611142.23 starting 192.168.177.100
1631611142.24 result 192.168.177.100 good
1631611142.34 starting 172.17.1.1
1631611144.47 result 192.168.1.20 good
1631611145.11 result 192.168.177.20 good
1631611145.97 result 172.17.1.1 BAD

最新更新