ZMQ客户端-工作人员通信模式



阅读ZeroMQ文档时,当我发现这三个套接字组合时,我有点不知所措。它们是:

  • 经销商到路由器
  • 经销商对经销商
  • 路由器到路由器

我知道DEALER和ROUTER是同步REQ/REP通信的替代品,因此它们变得异步,并且可以连接多个节点。我不明白的是,交易商如何可以替代交易商对交易商中的REQ和REP(以及路由器对路由器中的路由器)。

我正在寻找一种模式,允许任意数量的客户端向任意数量的工作人员提交作业,处理(负载平衡)这些作业并向客户端返回响应(和中间结果)(异步,但发送多条消息)。客户端可能还需要能够提前终止工作。我发现文档在这方面有点轻(我不是专家,可能错过了相关部分)。

我很乐意自己解决细节,但每次我认为我找到了一个合适的模式,我就会发现另一个可能同样合适的模式(例如,在我看来,这3种模式同样合适:http://zguide.zeromq.org/page:all#ROUTER-经纪人和REQ工作人员,http://zguide.zeromq.org/page:all#ROUTER-经纪人和交易商工作人员,http://zguide.zeromq.org/page:all#A-负载平衡消息代理)。

任何关于结构(哪个套接字用于哪个组件进行通信)的建议都是值得赞赏的。

更新

这就是我到目前为止想到的:

import multiprocessing
import zmq
import time
router_url_b = 'tcp://*:5560'
router_url = 'tcp://localhost:5560'
dealer_url_b = 'tcp://*:5561'
dealer_url = 'tcp://localhost:5561'

def broker():
context = zmq.Context()
router = context.socket(zmq.ROUTER)
router.bind(router_url_b)
dealer = context.socket(zmq.DEALER)
dealer.bind(dealer_url_b)
poll = zmq.Poller()
poll.register(router, zmq.POLLIN)
poll.register(dealer, zmq.POLLIN)
while True:
poll_result = dict(poll.poll())
if router in poll_result:
ident, msg = router.recv_multipart()
print 'router: ident=%s, msg=%s' % (ident, msg)
# print 'router received "%s" and ident %s' % (msg, ident)
dealer.send_multipart([ident, msg])
# dealer.send(msg)
if dealer in poll_result:
ident, msg = dealer.recv_multipart()
print 'dealer: ident=%s, msg=%s' % (ident, msg)
router.send_multipart([ident, msg])

def client(client_id):
context = zmq.Context()
req = context.socket(zmq.DEALER)
# setting identity doesn't seem to make a difference
req.setsockopt(zmq.IDENTITY, b"%s" % client_id)
req.connect(router_url)
req.send('work %d' % client_id)
while True:
msg = req.recv()
print 'client %d received response: %s' % (client_id, msg)

def worker(worker_id):
context = zmq.Context()
# to allow asynchronous sending of responses.
rep = context.socket(zmq.ROUTER)
# not sure if this is required...
# rep.setsockopt(zmq.IDENTITY, b"%s" % (10+worker_id))
rep.connect(dealer_url)
while True:
msg = rep.recv_multipart()
ident, msg = msg[:-1], msg[-1]
print 'worker %d received: "%s", ident="%s"' % (worker_id, msg, ident)
# do some work...
time.sleep(10)
rep.send_multipart(ident + ['result A from worker %d (%s)' % (worker_id, msg)])
# do more work...
time.sleep(10)
rep.send_multipart(ident + ['result B from worker %d (%s)' % (worker_id, msg)])
print 'finished worker', worker_id

def main():
print 'creating workers'
for i in xrange(2):
p = multiprocessing.Process(target=worker, args=(i, ))
p.daemon = True
p.start()
print 'creating clients'
for i in xrange(5):
p = multiprocessing.Process(target=client, args=(i, ))
p.daemon = True
p.start()
broker()

if __name__ == '__main__':
main()

它似乎运行得很好。唯一缺少的是在工作人员开始处理工作后从客户端到工作人员的通信。我想最好的想法是创建一个新的某种控制通道(pub/sub),以便在需要时终止工作人员。

还有几个问题:

  • 这个模型有什么明显的弱点吗
  • IDENTITY有什么用?如果我设置了这些值(无论是在客户端还是在工作者中),似乎都无关紧要
  • 工人收到的第一条信息是: worker 1 received: "work 3", ident="['x00x80x00Axa7', '3']" worker 0 received: "work 4", ident="['x00x80x00Axa7', '4']" 为什么两个工人的第一个ident项目相同?我理解路由器工作的方式是分配随机身份,它会跟踪这些身份。这是如何工作的(在一个小规模的例子中似乎是这样)

除了我的更新,我发现工作人员可以使用DEALER连接到服务器的后端。模式和解释可以在这里找到。

客户端使用DEALER套接字,服务器在前端以路由器的形式接收请求(asyn+许多客户端),使用DEALER套接字(asyn)将请求代理到工作进程(后端),工作进程在DEALER插座上侦听服务器的后端(asyn,无需路由,尽管ROUTER也工作)。

如果工人是严格同步的,我们会使用REP,但由于要发送多个回复,我们需要一个异步套接字。我们不想路由回复;它们总是转到发送我们的请求。

一个进一步的修改是用zmq.proxy(router, dealer)(broker()中的while True循环)替换路由器/经销商消息的隐式调度。

更新

显然,这个模式使用了ZMQ的标准循环路由。自定义任务分配可以通过ROUTER到ROUTER模式来实现。在这种情况下,客户端从发送请求开始,工作者从发送就绪消息开始。代理管理一个准备就绪的工作程序列表,如果没有可用的工作程序,则关闭对新客户端消息的轮询(从而使用ZMQ的内部消息缓冲区)。

最新更新