如何在调用task_done之前将项目放入JoinableQueue



我创建了一个JoinableQueue实例,并向其中添加了一些项目。然后创建消费者工作者来消费Queue中的这些项目。

然而,问题是我需要能够在这些worker中向队列中添加新项目,但当在q.get()之后没有调用task_done()时,Gevent不允许我执行此操作。如果我在调用task_done()后将newItems添加到队列中,我无法确保所有项目都已消耗。

它抛出gevent.hub.LoopExit: This operation would block forever

q = JoinableQueue()
def worker():
    item = q.get()
    newItems = consumeItem(item)
    [q.put(newItem) for newItem in newItems]
    q.task_done()
for item in initialItems:
    q.put(item)
for i in range(10):
    gevent.spawn(worker)
q.join() # I have to be sure all items are consumed when join stops blocking the program.

如何解决此问题?

您的工人只运行一次。如果您的队列中的项目数超过了派生的工作项数,则它永远不会结束。

解决方案很简单,只需向工人添加一个循环:

def worker():
    for item in iter(q.get, None):
        newItems = consumeItem(item)
        [q.put(newItem) for newItem in newItems]
        q.task_done()

现在,所有这些都继续工作,直到队列为空。

相关内容

  • 没有找到相关文章

最新更新