ZMQ C++类中的事件循环



我使用 ZMQ 的总体目标是避免陷入异步消息传递的杂草;ZMQ 似乎是一个可移植且实用的解决方案。然而,大多数 ZeroMQ 文档都是这样的,以及我用谷歌搜索到的许多其他 zmq 示例都是基于 helloworld.c 格式的。也就是说,它们都是 int main(){} 中的简单过程代码。

我的问题是我想将 zmq"侦听器"嵌入"到类似 c++ 单例的类中。我想"侦听"消息,然后处理它们。我打算使用 zmq 的 PUSH -> PULL 套接字,以防万一。我不知道该怎么做的是内部"事件循环"。

class foomgr {
    public:
        static foomgr& get_foomgr();
    // ...
    private:
        foomgr();
        foomgr(const &foomgr);
    // ...
        listener_() {
            // EVENT LOOP HERE
            // RECV and PROCESS ZMQ MSGS
            // while(true) DOES NOT WORK HERE
        }
    // ...
        zmq::context_t zmqcntx_;
        zmq::socket_t zmqsock_;
        const int zmqsock_linger_ = 1000;
    // ....
}

我显然不能在侦听器中使用while(true)结构,因为无论我从哪里调用它都会阻塞。由于使用 ZMQ 的优点之一是我不必自己管理"侦听器"线程,因此必须弄清楚如何创建自己的线程来包装listener_似乎很愚蠢。我对解决方案感到迷茫。

注意:我是一个 c++ 新手,所以对大多数人来说可能很明显的东西对我来说并不明显。另外,我正在尝试使用通用的"单词",而不是特定于库或语言以避免混淆。代码是使用 -std=c++11 构建的,所以那些构造很好。

ZMQ C++ 库不实现消息轮询的侦听器模式。它将该任务留给您包装自己的类。但是,它确实支持轮询新消息的非阻塞模式。

因此,使用正确的代码,您可以以非阻塞的方式将其包装在一个小循环中。

请参阅用 C++ 编写的 GitHub 上的轮询示例。请注意,它从 2 个套接字轮询,因此您需要对其进行一些修改以删除额外的代码。

您需要包装在自己的观察器实现中的重要部分如下:

zmq::message_t message;
zmq::poll (&items [0], 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
    receiver.recv(&message);
    //  Process task
}

Zmq 在设计上不是线程安全的(到目前为止的版本)。事实上,Zmq强调:

不要使用或关闭套接字,除非在创建套接字的线程中。 时期。

不应使用回调,

因为调用回调的线程肯定与创建套接字的线程不同,这是被禁止的。

也许,你会发现有用的 zmqHelper,一个小库(只有两个类和几个函数),以便在 C++ 中使用 Zmq 更容易,并强制执行(保证)线程不能共享套接字。

在示例部分中,您将了解如何执行最常见的任务。

希望对您有所帮助。

代码片段:在路由器-经销商代理中使用 zmqHelper 进行轮询。

zmq::context_t theContext {1}; // 1 thread in the socket 
SocketAdaptor< ZMQ_ROUTER > frontend_ROUTER {theContext};
SocketAdaptor< ZMQ_DEALER > backend_DEALER {theContext};
frontend_ROUTER.bind ("tcp://*:8000");
backend_DEALER.bind ("tcp://*:8001");
while (true) {
  std::vector<std::string> lines;
  // 
  //  wait (blocking poll) for data in any socket
  // 
  std::vector< zmqHelper::ZmqSocketType * > list
    = {  frontend_ROUTER.getZmqSocket(),  backend_DEALER.getZmqSocket() };
  zmqHelper::ZmqSocketType *  from = zmqHelper::waitForDataInSockets ( list );
  // 
  //  there is data, where is it from?
  // 
  if ( from ==  frontend_ROUTER.getZmqSocket() ) {
    // from frontend, read ...
    frontend_ROUTER.receiveText (lines);
    // ... and resend
    backend_DEALER.sendText( lines );
  }
  else if ( from ==  backend_DEALER.getZmqSocket() ) {
    // from backend, read ...
    backend_DEALER.receiveText (lines);
    // ... and resend
    frontend_ROUTER.sendText( lines );
  } 
  else if ( from == nullptr ) {
    std::cerr << "Error in poll ?n";
  }
} // while (true)

相关内容

  • 没有找到相关文章

最新更新