gevent -将一个值并行地传递给多个greenlet



我正在尝试实现一个简单的gevent设置。有一个发送者应该并发发送一个值给几个等待者Event类最接近于解决这个问题,如下所示。

每三秒,setter创建一个事件,该事件解除所有等待者的阻塞。事件之后立即被清除,所以服务员会再次阻塞,直到下一次。

import gevent
from gevent.event import Event
evt = Event()
def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    while True:
        gevent.sleep(3)
        evt.set()
        evt.clear()
def waiter(arg):
    while True:
        evt.wait()
        print("waiter {}".format(arg))
def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter,1),
        gevent.spawn(waiter,2),
        gevent.spawn(waiter,3),
    ])
if __name__ == '__main__': main()

现在,我只需要这样做,并向服务员传递一个值。显而易见的选择是使用AsyncResult。然而,这是不可能的clear AsyncResult对象,所以等待者结束在一个无限循环。

你有什么想法如何实现这个吗?

我认为最好的办法是使用队列。我创建了一个BroadcastQueue类,使它更容易管理发送一个值给许多消费者。生产者调用BroadcastQueue.broadcast(),它将向所有注册的消费者发送一个值。消费者通过调用BroadcastQueue.register来注册,它返回一个唯一的gevent.queue.Queue()对象。消费者然后使用该对象来get来自生产者的消息。

import gevent
from gevent.queue import Queue

class BroadcastQueue(object):
    def __init__(self):
        self._queues = []
    def register(self):
        q = Queue()
        self._queues.append(q)
        return q
    def broadcast(self, val):
        for q in self._queues:
            q.put(val)

def setter(bqueue):
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    while True:
        gevent.sleep(3)
        bqueue.broadcast("hi")
def waiter(arg, bqueue):
    queue = bqueue.register()
    while True:
        val = queue.get()
        print("waiter {} {}".format(arg, val))
def main():
    bqueue = BroadcastQueue()
    gevent.joinall([
        gevent.spawn(setter, bqueue),
        gevent.spawn(waiter, 1, bqueue),
        gevent.spawn(waiter, 2, bqueue),
        gevent.spawn(waiter, 3, bqueue),
    ])
if __name__ == '__main__':
    main()
输出:

waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi

为什么不直接使用可变对象来存储您想要发送的内容,并将引用传递给setter和await呢?setter可以在调用set之前把它改成什么?见下文:

import gevent
from gevent.event import Event
evt = Event()
def setter(arg):
    '''After 3 seconds, wake all threads waiting on the value of arg['it']'''
    while True:
        gevent.sleep(3)
        arg['it'] += 1
        evt.set()
        evt.clear()
def waiter(num, arg):
    while True:
        evt.wait()
        print("waiter {} {}".format(num, arg['it']))
def main():
    THING = {'it': 1}
    gevent.joinall([
        gevent.spawn(setter, THING),
        gevent.spawn(waiter, 1, THING),
        gevent.spawn(waiter, 2, THING),
        gevent.spawn(waiter, 3, THING),
    ])
if __name__ == '__main__': main()
输出:

waiter 2 2
waiter 1 2
waiter 3 2
waiter 2 3
waiter 1 3
waiter 3 3
waiter 2 4
waiter 1 4
waiter 3 4

您可以在Event本身的派生类中添加此行为。

这里我们添加了一个get()方法,它只使用wait(), setval()方法也使用set()

import gevent
from gevent.event import Event
class Evt(Event):
    def __init__(self):
        super().__init__()
        self._val = None
    def setval(self, val):
        self._val = val
        self.set()
    def get(self):
        self.wait()
        return self._val
evt = Evt()
def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    n = 0
    while True:
        gevent.sleep(3)
        evt.setval(n)
        evt.clear()
        n += 1
def waiter(arg):
    while True:
        val = evt.get()
        print("waiter {}: val = {}".format(arg, val))
def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter, 1),
        gevent.spawn(waiter, 2),
        gevent.spawn(waiter, 3),
    ])
if __name__ == '__main__':
    main()

最新更新