我正在尝试实现一个简单的gevent设置。有一个发送者应该并发发送一个值给几个等待者。Event
类最接近于解决这个问题,如下所示。
每三秒,setter创建一个事件,该事件解除所有等待者的阻塞。事件之后立即被清除,所以服务员会再次阻塞,直到下一次。
import gevent
from gevent.event import Event
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
while True:
gevent.sleep(3)
evt.set()
evt.clear()
def waiter(arg):
while True:
evt.wait()
print("waiter {}".format(arg))
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter,1),
gevent.spawn(waiter,2),
gevent.spawn(waiter,3),
])
if __name__ == '__main__': main()
现在,我只需要这样做,并向服务员传递一个值。显而易见的选择是使用AsyncResult
。然而,这是不可能的clear
AsyncResult对象,所以等待者结束在一个无限循环。
你有什么想法如何实现这个吗?
我认为最好的办法是使用队列。我创建了一个BroadcastQueue
类,使它更容易管理发送一个值给许多消费者。生产者调用BroadcastQueue.broadcast()
,它将向所有注册的消费者发送一个值。消费者通过调用BroadcastQueue.register
来注册,它返回一个唯一的gevent.queue.Queue()
对象。消费者然后使用该对象来get
来自生产者的消息。
import gevent
from gevent.queue import Queue
class BroadcastQueue(object):
def __init__(self):
self._queues = []
def register(self):
q = Queue()
self._queues.append(q)
return q
def broadcast(self, val):
for q in self._queues:
q.put(val)
def setter(bqueue):
'''After 3 seconds, wake all threads waiting on the value of evt'''
while True:
gevent.sleep(3)
bqueue.broadcast("hi")
def waiter(arg, bqueue):
queue = bqueue.register()
while True:
val = queue.get()
print("waiter {} {}".format(arg, val))
def main():
bqueue = BroadcastQueue()
gevent.joinall([
gevent.spawn(setter, bqueue),
gevent.spawn(waiter, 1, bqueue),
gevent.spawn(waiter, 2, bqueue),
gevent.spawn(waiter, 3, bqueue),
])
if __name__ == '__main__':
main()
输出:waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
为什么不直接使用可变对象来存储您想要发送的内容,并将引用传递给setter和await呢?setter可以在调用set之前把它改成什么?见下文:
import gevent
from gevent.event import Event
evt = Event()
def setter(arg):
'''After 3 seconds, wake all threads waiting on the value of arg['it']'''
while True:
gevent.sleep(3)
arg['it'] += 1
evt.set()
evt.clear()
def waiter(num, arg):
while True:
evt.wait()
print("waiter {} {}".format(num, arg['it']))
def main():
THING = {'it': 1}
gevent.joinall([
gevent.spawn(setter, THING),
gevent.spawn(waiter, 1, THING),
gevent.spawn(waiter, 2, THING),
gevent.spawn(waiter, 3, THING),
])
if __name__ == '__main__': main()
输出:waiter 2 2
waiter 1 2
waiter 3 2
waiter 2 3
waiter 1 3
waiter 3 3
waiter 2 4
waiter 1 4
waiter 3 4
您可以在Event本身的派生类中添加此行为。
这里我们添加了一个get()
方法,它只使用wait()
, setval()
方法也使用set()
import gevent
from gevent.event import Event
class Evt(Event):
def __init__(self):
super().__init__()
self._val = None
def setval(self, val):
self._val = val
self.set()
def get(self):
self.wait()
return self._val
evt = Evt()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
n = 0
while True:
gevent.sleep(3)
evt.setval(n)
evt.clear()
n += 1
def waiter(arg):
while True:
val = evt.get()
print("waiter {}: val = {}".format(arg, val))
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter, 1),
gevent.spawn(waiter, 2),
gevent.spawn(waiter, 3),
])
if __name__ == '__main__':
main()