我有一个在gevent中运行的原始生产者/消费者脚本。它启动了一些生产者函数,将东西放入gevent.queue.Queue
,以及一个消费者函数,将它们从队列中取出:
from __future__ import print_function
import time
import gevent
import gevent.queue
import gevent.monkey
q = gevent.queue.Queue()
# define and spawn a consumer
def consumer():
while True:
item = q.get(block=True)
print('consumer got {}'.format(item))
consumer_greenlet = gevent.spawn(consumer)
# define and spawn a few producers
def producer(ID):
while True:
print("producer {} about to put".format(ID))
q.put('something from {}'.format(ID))
time.sleep(0.1)
# consumer_greenlet.switch()
producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
gevent.monkey.patch_all()
print("about to join")
consumer_greenlet.join()
如果我让gevent隐式地处理调度(例如通过调用time),它会工作得很好。sleep或其他一些gevent.monkey.patch()
ed函数),但是当我显式切换到消费者(用注释掉的switch
调用替换time.sleep
)时,gevent会引发AssertionError:
Traceback (most recent call last):
File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
result = self._run(*self.args, **self.kwargs)
File "switch_test.py", line 14, in consumer
item = q.get(block=True)
File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get
assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
AssertionError: Invalid switch into Queue.get: ()
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError
我想采用显式切换,因为在生产中我有很多生产者,gevent的调度没有为消费者分配足够的运行时间,队列变得越来越长(这很糟糕)。或者,任何关于如何配置或修改gevent的调度程序的见解都是非常感谢的。
这是在Python 2.7.2, gevent 1.0.1和greenlet 0.4.5上。
在我看来,显式开关与隐式开关并不能很好地配合。您已经发生了隐式切换,要么是因为猴子补丁I/O,要么是因为gevent.queue.Queue()
.
gevent文档不鼓励使用原始的greenlet方法:
作为一个greenlet子类,greenlet也有switch()和throw()方法。但是,这些不应该在应用程序级别使用它们很容易导致永远没有计划的绿地。选择更高级的安全类,比如Event和Queue。
迭代gevent.queue.Queue()
或访问队列的get
方法会进行隐式切换,有趣的是put
不会。所以你必须自己生成一个隐式线程开关。最简单的方法是调用gevent.sleep(0)
(您不必实际等待特定的时间)。
总之,只要你的代码中没有阻塞的IO操作,你甚至不需要猴子操作。
我会这样重写你的代码:
import gevent
import gevent.queue
q = gevent.queue.Queue()
# define and spawn a consumer
def consumer():
for item in q:
print('consumer got {}'.format(item))
consumer_greenlet = gevent.spawn(consumer)
# define and spawn a few producers
def producer(ID):
print('producer started', ID)
while True:
print("producer {} about to put".format(ID))
q.put('something from {}'.format(ID))
gevent.sleep(0)
producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
print("about to join")
consumer_greenlet.join()