如何添加新的项目asyncio队列,只要插槽是可用的?



代码如下:

名为"ids"的列表包含id号。我通过id号下载特定的消息。'nDownload'是列表索引。队列的大小值等于5。

我从列表中取出项目,每次下载一条消息并将其添加到队列中。当nDownload = 6时:

  1. QueueFull异常发生。
  2. 创建5个worker。
  3. Workers从消息中提取元数据用于其他目的。
  4. await queue.join()阻塞,直到队列中的所有项目都被获取和处理。
  5. 结束→删除员工。

代码可以工作,到目前为止我没有任何问题。

nDownload = 0
workers = []
while (nDownload <= len(ids)):                        
try:
async for msg in get_messages(channel,ids=ids[nDownload]):
nDownload = nDownload + 1
try:   
queue.put_nowait(msg)
except (asyncio.QueueFull,IndexError) as qErr:
nDownload = nDownload - 1 
workers = [asyncio.create_task(worker(queue)) for _ in range(5)] 
await queue.join() 
for cancel in workers:
cancel.cancel()                                    
except IndexError as iErr:
break    

问题:有时消息的大小不同。例如:

消息1 = 8分钟下载100MB

消息2 = 5秒下载1MB

一旦它下载了最短的消息(消息2),我就会在队列中获得一个免费的"插槽"。不幸的是,我必须等待消息1,因为queue.join()

如何在队列中添加新项目?

为什么我使用queue.join() ?因为我不知道如何添加最多5个消息队列,等待下载并恢复我真的需要下载一组消息,而不是一次全部下载由于

编辑:是的,我的worker是这样定义的(简化)

async def worker(queue):
while True:
queue_msg = await queue.get()
loop = asyncio.get_event_loop()
try:
task = loop.create_task(extract(queue_msg))
await asyncio.wait_for(task, timeout=timeout)
except errors.Fail:
#Here I have to requeue the message when it fails,
#so it requeues the ID in order to download the same msg later
await queue.put(queue_msg.id)
except asyncio.TimeoutError: 
#requeue the msg etcc...
finally:    
queue.task_done()
你的回答很聪明,谢谢然而,我选择队列的大小>因为当"提取"任务失败时,我需要重新获取消息。(抱歉我没告诉你)我不知道会发生什么,如果队列大小= 1,我尝试添加项目。这有点难

这不是完全清楚你的约束是什么,但如果我理解正确的话:

  • 你想并行下载最多5个东西
  • 你不想浪费时间-一旦工人完成了一个项目,它应该获得一个新的

队列大小应该与您的目的无关,它仅在工人暂时比get_messages快的情况下用作缓冲区。我甚至从队列大小为1开始,并试验更大的值是否有助于提高性能。

QueueFull上刷出任务似乎很奇怪而且没有必要。实现生产者-消费者模式的惯用方法是创建固定数量的消费者,并让它们在到达时处理多个项目。你没有显示worker,所以不清楚每个工人是否只处理一个消息,还是多个消息。

我将把这个循环重写为:
queue = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
for current in ids:
async for msg in get_messages(channel, id=current):
# enqueue msg, waiting (if needed) for a free slot in the queue
await queue.put(msg)
# wait for the remaining enqueued items to be processed
await queue.join()
# cancel the now-idle workers, which wait for a new message
# that will never arrive
for w in workers:
w.cancel()

worker应该这样定义:

async def worker(queue):
while True:
msg = await queue.get()
... process msg ...
queue.task_done()

最新更新