同时获取队列中的请求



我编写了代码,允许我在处理前一个数据块的同时开始从 API 获取下一个数据块。

我希望在任何给定时刻始终同时获取多达5 个块,但即使队列中最后一个请求在任何其他请求之前完成,也应始终以正确的顺序处理返回的数据。

如何更改我的代码以实现此目的?

class MyClient:
async def fetch_entities(
self,
entity_ids:List[int],
objects:Optional[List[str]],
select_inbound:Optional[List[str]]=None,
select_outbound:Optional[List[str]]=None,
queue_size:int=5,
chunk_size:int=500,
):
"""
Fetch entities in chunks
While one chunk of data is being processed the next one can
already be fetched. In other words: Data processing does not
block data fetching.
"""
objects = ",".join(objects)
if select_inbound:
select_inbound = ",".join(select_inbound)
if select_outbound:
select_outbound = ",".join(select_outbound)
queue = asyncio.Queue(maxsize=queue_size)
# TODO: I want to be able to fill the queue with requests that are already executing
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
await queue.put(res)
await queue.put(None)
asyncio.create_task(queued_chunks())
while True:
res = await queue.get()
if res is None:
break
res.raise_for_status()
queue.task_done()
for entity in res.json():
yield entity

我会在这里使用两个队列:一个包含要处理的块,另一个用于完整的块。您可以使用任意数量的工作器任务来处理区块,并且可以对第一个队列设置大小限制,以限制预取的区块数。仅使用单个循环来接收已处理的块,以确保它们保持有序(您的代码已经这样做了)。

诀窍是将期货放入两个队列中,每个要处理的块一个。执行处理的工作线程任务获取块和未来对,然后需要通过将 POST 响应设置为这些未来的结果来解决关联的未来。处理已处理块的循环在每个将来等待,因此只有在当前块被完全处理后才会继续下一个块。要使其正常工作,您需要将块相应的未来放入第一个队列中,供工作线程处理。把同样的未来放到第二排;这些强制按顺序处理块结果。

所以,总结一下:

  • 有两个队列:
    1. chunks容纳(chunk, future)对象。
    2. completed持有期货,*相同的期货与另一个队列中的区块配对。
  • 创建从区块队列消耗的"工作线程"任务。如果创建 5,则将并行处理 5 个块。每当工人竞争加工时,他们都会将结果设定在相应的未来。
  • 使用"已处理的块"循环;它从completed队列中获取下一个未来并等待它。只有当与该未来相关的特定块被竞争时,它才会产生结果(由工作线程任务设置)。

作为一个粗略的草图,它看起来像这样:

chunk_queue = asyncio.Queue()
completed_queue = asyncio.Queue()
WORKER_COUNT = queue_size
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
future = asyncio.Future()
await chunk_queue.put((ids, future))
await completed_queue.put(future)
completed_queue.put(None)
async def worker():
while True:
ids, future = chunk_queue.get()
try:
res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
res.raise_for_status()
future.set_result(res)
except Exception as e:
future.set_exception(e)
return
workers = [asyncio.create_task(worker) for _ in range(WORKER_COUNT)]
chunk_producer = asyncio.create_task(queued_chunks())
try:
while True:
future = await completed_queue.get()
if future is None:
# all chunks have been processed!
break
res = await future
yield from res.json()
finally:
for w in workers:
w.cancel()
asyncio.wait(workers)

如果必须限制排队的区块数(而不仅仅是并发处理的区块数),请在chunk_queue队列上设置maxsize(值大于WORKER_COUNT)。例如,使用此选项来限制内存要求。

但是,如果要将maxsize设置为等于WORKER_COUNT的值,则不妨完全摆脱工作线程任务,而是将工作线程循环的主体作为包装在任务中的协程放入已完成的结果队列中。asyncioTask类是Future的一个子类,当它包装的协程完成时,它会自动设置将来的结果。如果你不打算在chunk_queue中投入比你的工人任务更多的块,你不妨去掉中间人,完全放弃chunk_queue。然后,任务进入已完成的队列,而不是普通的未来:

completed_queue = asyncio.Queue(maxsize=queue_size)
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
task = asyncio.create_task(fetch_task(ids))
await completed_queue.put(task)
completed_queue.put(None)
async def fetch_task(ids):
res = await self.client.post(urllib.parse.quote("entities:fetchdata"),
json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
}
)
res.raise_for_status()
return res
chunk_producer = asyncio.create_task(queued_chunks())
while True:
task = await completed_queue.get()
if task is None:
# all chunks have been processed!
break
res = await task
yield from task.json()

这个版本非常接近你已经拥有的版本,唯一的区别是我们将客户端 POST 协程的等待和响应状态代码的检查放入一个单独的协程中,作为任务运行。您还可以将self.client.post()协程放入任务中(因此不要等待它),并将检查响应状态留给最终队列处理循环。这就是巴勃罗的回答所提出的,所以我不会在这里重复。

请注意,此版本在将任务放入队列之前启动任务。队列不是活动任务数量的唯一限制,还有一个已经开始的任务等待一端的空间放入队列(如果队列已满,则await completed_queue.put(task)行阻塞),另一个任务已经被队列消费者取出(由task = await completed_queue.get()获取)。如果需要限制活动任务的数量,请从队列 maxsize 中减去 2 以设置上限。

此外,由于任务可以同时完成,因此活动任务可能会maxsize + 1较少,但在队列中释放更多空间之前,您无法再启动任何任务。由于第一种方法将任务的输入排队,因此它没有这些问题。您可以通过使用信号量而不是绑定队列大小来限制任务(在开始任务之前获取槽,并在从任务返回之前释放槽)来缓解此问题。

就个人而言,我会选择我的第一个建议,因为它可以让您单独控制并发和块预取,而没有第二种方法的问题。

与其在排队之前等待协程,不如将协程排队并稍后等待

class MyClient:
async def fetch_entities(
self,
entity_ids:List[int],
objects:Optional[List[str]],
select_inbound:Optional[List[str]]=None,
select_outbound:Optional[List[str]]=None,
queue_size:int=5,
chunk_size:int=500,
):
"""
Fetch entities in chunks
While one chunk of data is being processed the next one can
already be fetched. In other words: Data processing does not
block data fetching.
"""
objects = ",".join(objects)
if select_inbound:
select_inbound = ",".join(select_inbound)
if select_outbound:
select_outbound = ",".join(select_outbound)
queue = asyncio.Queue(maxsize=queue_size)
# TODO: I want to be able to fill the queue with requests that are already executing
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
cor = self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
task = asyncio.create_task(cor)
await queue.put(cor)
await queue.put(None)
asyncio.create_task(queued_chunks())
while True:
task = await queue.get()
if task is None:
break
res = await task
res.raise_for_status()
queue.task_done()
for entity in res.json():
yield entity

最新更新