我创建了一个JoinableQueue实例,并向其中添加了一些项目。然后创建消费者工作者来消费Queue中的这些项目。
然而,问题是我需要能够在这些worker中向队列中添加新项目,但当在q.get()之后没有调用task_done()时,Gevent不允许我执行此操作。如果我在调用task_done()后将newItems添加到队列中,我无法确保所有项目都已消耗。
它抛出gevent.hub.LoopExit: This operation would block forever
q = JoinableQueue()
def worker():
item = q.get()
newItems = consumeItem(item)
[q.put(newItem) for newItem in newItems]
q.task_done()
for item in initialItems:
q.put(item)
for i in range(10):
gevent.spawn(worker)
q.join() # I have to be sure all items are consumed when join stops blocking the program.
如何解决此问题?
您的工人只运行一次。如果您的队列中的项目数超过了派生的工作项数,则它永远不会结束。
解决方案很简单,只需向工人添加一个循环:
def worker():
for item in iter(q.get, None):
newItems = consumeItem(item)
[q.put(newItem) for newItem in newItems]
q.task_done()
现在,所有这些都继续工作,直到队列为空。