我正试图在一个永不消亡的独立线程上拥有一个消费者。使用者可以获得任意数量的任务来执行。
我一直在摆弄asyncio和gevent,但还没有找到任何有效的东西。下面是我尝试做的一个非常简单的例子
q = gevent.queue.SimpleQueue()
def run_task(task):
print(f"Started task {task} @ {time.time()}")
gevent.sleep(1)
print(f"Finished task {task} @ {time.time()}")
def consumer():
while True:
task = q.get()
print(f"Dequed task {task} for consumption")
gevent.spawn(run_task, task)
q.put(1)
q.put(2)
q.put(3)
consumer()
输出
Dequed task 1 for consumption
Dequed task 2 for consumption
Dequed task 3 for consumption
显然,run_task
从未针对任务执行过。我可以使用join()
,但任务是按顺序运行的。使用joinall()
似乎也不是一个可行的解决方案,因为队列将不断获得任意任务。
有什么办法吗?
您的代码有两个问题:
-
SimpleQueue是Python标准库队列的别名,需要使用gevent monkey补丁。
-
您正在主线程上运行您的消费者。
此代码按预期工作:
import gevent
import gevent.queue
import time
q = gevent.queue.Queue()
def run_task(task):
print(f"Started task {task} @ {time.time()}")
gevent.sleep(1)
print(f"Finished task {task} @ {time.time()}")
def consumer():
while True:
task = q.get()
print(f"Dequed task {task} for consumption")
gevent.spawn(run_task, task)
q.put(1)
q.put(2)
q.put(3)
gevent.spawn(consumer)
gevent.sleep(1000)