我使用 ZMQ 的总体目标是避免陷入异步消息传递的杂草;ZMQ 似乎是一个可移植且实用的解决方案。然而,大多数 ZeroMQ 文档都是这样的,以及我用谷歌搜索到的许多其他 zmq 示例都是基于 helloworld.c 格式的。也就是说,它们都是 int main(){}
中的简单过程代码。
我的问题是我想将 zmq"侦听器"嵌入"到类似 c++ 单例的类中。我想"侦听"消息,然后处理它们。我打算使用 zmq 的 PUSH
-> PULL
套接字,以防万一。我不知道该怎么做的是内部"事件循环"。
class foomgr {
public:
static foomgr& get_foomgr();
// ...
private:
foomgr();
foomgr(const &foomgr);
// ...
listener_() {
// EVENT LOOP HERE
// RECV and PROCESS ZMQ MSGS
// while(true) DOES NOT WORK HERE
}
// ...
zmq::context_t zmqcntx_;
zmq::socket_t zmqsock_;
const int zmqsock_linger_ = 1000;
// ....
}
我显然不能在侦听器中使用while(true)
结构,因为无论我从哪里调用它都会阻塞。由于使用 ZMQ 的优点之一是我不必自己管理"侦听器"线程,因此必须弄清楚如何创建自己的线程来包装listener_似乎很愚蠢。我对解决方案感到迷茫。
注意:我是一个 c++ 新手,所以对大多数人来说可能很明显的东西对我来说并不明显。另外,我正在尝试使用通用的"单词",而不是特定于库或语言以避免混淆。代码是使用 -std=c++11 构建的,所以那些构造很好。
ZMQ C++ 库不实现消息轮询的侦听器模式。它将该任务留给您包装自己的类。但是,它确实支持轮询新消息的非阻塞模式。
因此,使用正确的代码,您可以以非阻塞的方式将其包装在一个小循环中。
请参阅用 C++ 编写的 GitHub 上的轮询示例。请注意,它从 2 个套接字轮询,因此您需要对其进行一些修改以删除额外的代码。
您需要包装在自己的观察器实现中的重要部分如下:
zmq::message_t message;
zmq::poll (&items [0], 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
receiver.recv(&message);
// Process task
}
Zmq 在设计上不是线程安全的(到目前为止的版本)。事实上,Zmq强调:
不应使用回调,不要使用或关闭套接字,除非在创建套接字的线程中。 时期。
因为调用回调的线程肯定与创建套接字的线程不同,这是被禁止的。
也许,你会发现有用的 zmqHelper,一个小库(只有两个类和几个函数),以便在 C++ 中使用 Zmq 更容易,并强制执行(保证)线程不能共享套接字。
在示例部分中,您将了解如何执行最常见的任务。
希望对您有所帮助。
代码片段:在路由器-经销商代理中使用 zmqHelper 进行轮询。
zmq::context_t theContext {1}; // 1 thread in the socket
SocketAdaptor< ZMQ_ROUTER > frontend_ROUTER {theContext};
SocketAdaptor< ZMQ_DEALER > backend_DEALER {theContext};
frontend_ROUTER.bind ("tcp://*:8000");
backend_DEALER.bind ("tcp://*:8001");
while (true) {
std::vector<std::string> lines;
//
// wait (blocking poll) for data in any socket
//
std::vector< zmqHelper::ZmqSocketType * > list
= { frontend_ROUTER.getZmqSocket(), backend_DEALER.getZmqSocket() };
zmqHelper::ZmqSocketType * from = zmqHelper::waitForDataInSockets ( list );
//
// there is data, where is it from?
//
if ( from == frontend_ROUTER.getZmqSocket() ) {
// from frontend, read ...
frontend_ROUTER.receiveText (lines);
// ... and resend
backend_DEALER.sendText( lines );
}
else if ( from == backend_DEALER.getZmqSocket() ) {
// from backend, read ...
backend_DEALER.receiveText (lines);
// ... and resend
frontend_ROUTER.sendText( lines );
}
else if ( from == nullptr ) {
std::cerr << "Error in poll ?n";
}
} // while (true)