Gevent无限队列消费者



我正试图在一个永不消亡的独立线程上拥有一个消费者。使用者可以获得任意数量的任务来执行。

我一直在摆弄asyncio和gevent,但还没有找到任何有效的东西。下面是我尝试做的一个非常简单的例子

q = gevent.queue.SimpleQueue()

def run_task(task):
print(f"Started task {task} @ {time.time()}")
gevent.sleep(1)
print(f"Finished task {task} @ {time.time()}")

def consumer():
while True:
task = q.get()
print(f"Dequed task {task} for consumption")
gevent.spawn(run_task, task)

q.put(1)
q.put(2)
q.put(3)
consumer()

输出

Dequed task 1 for consumption
Dequed task 2 for consumption
Dequed task 3 for consumption

显然,run_task从未针对任务执行过。我可以使用join(),但任务是按顺序运行的。使用joinall()似乎也不是一个可行的解决方案,因为队列将不断获得任意任务。

有什么办法吗?

您的代码有两个问题:

  1. SimpleQueue是Python标准库队列的别名,需要使用gevent monkey补丁。

  2. 您正在主线程上运行您的消费者。

此代码按预期工作:

import gevent
import gevent.queue
import time
q = gevent.queue.Queue()
def run_task(task):
print(f"Started task {task} @ {time.time()}")
gevent.sleep(1)
print(f"Finished task {task} @ {time.time()}")
def consumer():
while True:
task = q.get()
print(f"Dequed task {task} for consumption")
gevent.spawn(run_task, task)
q.put(1)
q.put(2)
q.put(3)
gevent.spawn(consumer)
gevent.sleep(1000)

相关内容

  • 没有找到相关文章

最新更新