ZeroMQ,我们可以使用 inproc: 传输以及发布/订阅消息传递模式



场景:

我们正在评估事件驱动机制的ZeroMQ(特别是jeroMq(。

应用程序是分布式的,其中多个服务(发布者和订阅者都是服务(可以存在于同一个 jvm 或不同的节点中,这取决于部署体系结构。

观察

为了玩,我创建了一个inproc: 作为传输的 pub/sub 模式,使用 jero mq(版本:0.3.5(

  1. 线程发布能够发布(看起来像发布,至少没有错误(
  2. 位于另一个线程中的订阅者没有收到任何内容。

问题

inproc:pub/sub一起使用是否可行?

尝试谷歌搜索,但找不到任何具体的东西,有什么见解吗?

pub/sub的代码示例与inproc:

使用 jero mq(版本 :0.3.5(的 inproc pub sub 的工作代码示例对于以后访问这篇文章的人很有用。一个发布者发布主题AB,两个订阅者分别接收AB

/**
 * @param args
 */
public static void main(String[] args) {
    // The single ZMQ instance
    final Context context = ZMQ.context(1);
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });
}
/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {
    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}
/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {
    Socket subscriber = context.socket(ZMQ.SUB);
    subscriber.connect("inproc://test");
    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();
}
/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber
    Socket subscriber = context.socket(ZMQ.SUB);
    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();
}

ZMQ inproc 传输旨在用于不同线程之间的单个进程。 当你说"可以存在于同一个jvm或不同的节点中"(强调我的(时,我假设你的意思是你正在将多个进程作为分布式服务而不是单个进程中的多个线程。

如果是这种情况,那么不,您要做的事情不适用于inproc. PUB-SUB/inproc可以在多个线程之间的单个进程中正常工作。


编辑以解决评论中的更多问题:

使用 inprocipc 等传输的原因是,当您在正确的上下文中使用它们时,它比 tcp 传输更有效(更快(。您可以想象混合使用传输,但您始终必须在同一传输上绑定和连接才能使其工作。

这意味着每个节点最多需要三个PUBSUB套接字 - 一个tcp发布者与远程主机上的节点通信,一个ipc发布服务器与同一主机上不同进程上的节点通信,以及一个inproc发布服务器与同一进程中不同线程中的节点通信。

实际上,在大多数情况下,您只需使用tcp传输,并且只为所有内容旋转一个插槽 - tcp可以在任何地方工作。如果每个套接字负责特定类型的信息,则启动多个套接字可能是有意义的。

如果您总是将一种消息类型发送到其他

线程,而将不同的消息类型发送到其他主机,那么多个套接字是有意义的,但在您的情况下,从一个节点的角度来看,听起来所有其他节点都是平等的。在这种情况下,我会在任何地方使用tcp并完成它。

最新更新