我想写一个python
脚本(称为parent),它做以下事情:
(1)定义了一个多维numpy
数组
(2) forks
10个不同的python
脚本(称它们为子脚本)它们中的每一个都必须能够从(1)在任何一个时间点(只要它们是活的)read
numpy
数组的内容。
(3)每个子脚本将完成它自己的工作(子不相互共享任何信息)
(4)在任何时间点,父脚本必须能够接受来自其所有子脚本的消息。这些消息将被父解析,并导致(1)中的numpy
数组发生变化。
在Linux
环境中在python
中工作时,我如何做到这一点?我想使用zeroMQ
,并有父是一个单一的订阅者,而子将所有发布者;这有意义吗?还是有更好的方法?
另外,如何允许所有子连续读取父定义的numpy
数组的内容?
sub
通道不一定是要绑定的通道,因此您可以让订阅者绑定,并且每个pub
子通道都可以连接到该通道并发送它们的消息。在这种特殊情况下,我认为multiprocessing
模块更适合,但我认为它值得一提:
import zmq
import threading
# So that you can copy-and-paste this into an interactive session, I'm
# using threading, but obviously that's not what you'd use
# I'm the subscriber that multiple clients are writing to
def parent():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, 'Child:')
# Even though I'm the subscriber, I'm allowed to get this party
# started with `bind`
socket.bind('tcp://127.0.0.1:5000')
# I expect 50 messages
for i in range(50):
print 'Parent received: %s' % socket.recv()
# I'm a child publisher
def child(number):
context = zmq.Context()
socket = context.socket(zmq.PUB)
# And even though I'm the publisher, I can do the connecting rather
# than the binding
socket.connect('tcp://127.0.0.1:5000')
for data in range(5):
socket.send('Child: %i %i' % (number, data))
socket.close()
threads = [threading.Thread(target=parent)] + [threading.Thread(target=child, args=(i,)) for i in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
特别是,文档的核心消息传递模式部分讨论了这样一个事实,即对于模式,任何一方都可以绑定(而另一方可以连接)。
我认为使用PUSH/PULL插座更有意义,因为您有一个标准的通风机-工人-水槽方案,除了通风机和水槽是相同的过程。
另外,考虑使用多处理模块而不是ZeroMQ。这可能会简单一些。
在ZeroMQ中,每个端口只能有一个发布者。唯一(丑陋的)解决方法是在不同的端口上启动每个子PUB套接字,并让父套接字侦听所有这些端口。
但是在0MQ,用户指南中描述的管道模式是一个更好的方法。