我写了两种类型的绿素。MyGreenletPUB
将通过 ZMQ 发布消息类型 1 和消息类型 2 的消息。MyGreenletSUB
实例将根据参数("1"
和"2"
)订阅ZMQPUB
。
这里的问题是,当我在MyGreenletSUB
代码中启动我的 Greenlets 运行方法时,代码将在message = sock.recv()
停止,并且永远不会将运行时执行返回到其他 greenlet。
我的问题是我如何避免这种情况以及如何与while TRUE
异步启动我的 greenlets,而无需使用gevent.sleep()
in while 方法来切换 greenlet 之间的执行
from gevent.monkey import patch_all
patch_all()
import zmq
import time
import gevent
from gevent import Greenlet
class MyGreenletPUB(Greenlet):
def _run(self):
# ZeroMQ Context
context = zmq.Context()
# Define the socket using the "Context"
sock = context.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:5680")
id = 0
while True:
gevent.sleep(1)
id, now = id + 1, time.ctime()
# Message [prefix][message]
message = "1#".format(id=id, time=now)
sock.send(message)
# Message [prefix][message]
message = "2#".format(id=id, time=now)
sock.send(message)
id += 1
class MyGreenletSUB(Greenlet):
def __init__(self, b):
Greenlet.__init__(self)
self.b = b
def _run(self):
context = zmq.Context()
# Define the socket using the "Context"
sock = context.socket(zmq.SUB)
# Define subscription and messages with prefix to accept.
sock.setsockopt(zmq.SUBSCRIBE, self.b)
sock.connect("tcp://127.0.0.1:5680")
while True:
message = sock.recv()
print message
g = MyGreenletPUB.spawn()
g2 = MyGreenletSUB.spawn("1")
g3 = MyGreenletSUB.spawn("2")
try:
gevent.joinall([g, g2, g3])
except KeyboardInterrupt:
print "Exiting"
默认的 ZeroMQ.recv()
方法作案手法是阻止,直到到达任何内容,这些内容将传递给.recv()
方法调用者的手中。
对于确实聪明的非阻塞代理,请始终使用相当.poll()
实例方法和.recv( zmq.NOBLOCK )
。
请注意,ZeroMQ 订阅基于左侧的主题过滤器匹配,如果同时分发/收集混合的 unicode 和非 unicode 字符串,可能会出现问题。
此外,混合多个事件循环可能会变得有点棘手,具体取决于您的控制需求。我个人总是更喜欢非阻塞系统,即使以更复杂的设计工作为代价。