我在 c++ 中使用 zmq 2.2(我知道是旧版本)来创建具有多个以不同速度读取消息的连接订阅者的发布者。根据我对文档的理解,以及Peter Hintjens在这里的回答,每个订阅者都有自己的队列,发布者为每个连接的订阅者都有一个队列。这似乎表明每个订阅者都独立于其他订阅者接收来自发布者的消息。
但是,在下面的代码片段中,快速和慢速订阅者会收到类似的消息或完全相同的消息(即使我增加了A点的睡眠时间并更改了B点的ZMQ_HWM
,也会发生这种情况)。
谁能阐明为什么会发生这种情况?
#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;
vector<int> slow_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
sleep(3); // 3 seconds
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
usleep(10000); // 10 miliseconds ___________________________POINT A
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
void publisher(int64_t hwm)
{
context_t context{1};
socket_t socket(context, ZMQ_PUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.bind("tcp://*:5554");
int count = 0;
while (true) {
msg_t msg(sizeof(count));
memcpy(msg.data(), &count, sizeof(count));
socket.send(msg);
count++;
}
}
int main()
{
int64_t hwm = 1; // __________________________________________POINT B
int to_read = 20;
auto fast = async(launch::async, fast_consumer, hwm, to_read);
auto slow = async(launch::async, slow_consumer, hwm, to_read);
hwm = 1; // Don't queue anything on the publisher
thread pub(publisher, hwm);
auto slow_v = slow.get();
auto fast_v = fast.get();
cout << "fast slow" << endl;
for (int i = 0; i < fast_v.size(); i ++)
{
cout << fast_v[i] << " " << slow_v[i] << endl;
}
exit(0);
}
编译者:g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread
者 海湾合作委员会 6.3
示例输出:
fast slow
25988 305855
52522 454312
79197 477807
106365 502594
132793 528551
159236 554519
184486 581419
209208 606411
234483 629298
256122 651159
281188 675031
305855 701533 // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312 727817
477807 754154
502594 778654
528551 804137
554519 830677
581419 854959
606411 878841
629298 902601
每个订阅者都有自己的队列
是的,它确实...
这来自PUB
端.Context()
实例的设计属性,其中发生了发送队列管理(稍后会详细介绍)。
人们可能会在[不到五秒的时间内完成ZeroMQ层次结构]部分中简要阅读有关主要概念技巧的简短阅读。
这似乎表明每个订阅者都独立于其他订阅者接收来自发布者的消息。
是的,它确实...
各个">专用"队列之间没有交互。这里重要的是ZMQ_HWM
,其副作用作用是"阻止者"语义。
在此设置中,简约的ZMQ_HWM
保护/阻止任何新条目插入PUB
端"私有"发送队列(大小不超过根据ZMQ_HWM == 1
),直到它被成功远程清空(由"远程"SUB
端Context()
自主异步"内部"传输相关计划,在其可能(重新)加载该SUB
-侧"私有"-接收-队列(大小,再次,不超过根据ZMQ_HWM == 1
)
换句话说,PUB.send()
-s 的有效负载将被有效地丢弃,直到远程*_SUB.recv()
-s 将从其"远程"Context()
-实例的接收队列中卸载"阻塞"有效负载(根据ZMQ_HWM == 1
,大小,设计为不能存储任何单个有效负载超过一个)。
以这种方式,PUB.send()
-er 在(秘密阻止)测试期间发射了超过~ 902601
条消息,在SUB
端(== to_read
)接收了大约20
条消息。
所有这些902581+
消息在调用.send()
方法时被Context()
直接扔在PUB
边。
它实际上是如何在内部工作的?Context()
内部的简化视图
给定上面的模型示例,Context()
托管的队列池根据.connect()
ed 对等节点出现和消失而增长/合约,但在 ZeroMQ API v2.2 中,TX 和 RX 侧具有相同的高水位标记上限。如前所述,尝试.send()
任何超过此限制的内容都会被丢弃。
TIME _____________________________
v [ ]
v [ ]
v [ ]
v [ ]
v PUB.setsockopt( ZMQ_HWM, 1 );]
v PUB.send()-s [ | ]
v : [ +-----------------QUEUE-length ( a storage depth ) is but one single message
v _________________ : [
v [ ] : [Context()-managed pool-of-QUEUE(s)
v [ ] : [
v [ ] : [ ___________________
v [ ] : [ [ ]
v FAST_SUB.connect()---:------------>[?] [ ]
v FAST_SUB.recv()-s : [?] [ ]
v : : [?] [ ]
v : : [?][?]<---SLOW_SUB.connect() ]
v : : [?][?] SLOW_SUB.recv()-s ]
v : .send(1)----->[1][1] :
| 1 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(2)----->[2][1] :
| 2 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(3)----->[3][1] :
| 3 <-.recv()--------------------[?][?]------------.recv()-> 1
| : [?][?] :
| : .send(4)----->[4][4] :
| 4 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(5)----->[5][4] :
| 5 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(6)----->[6][4] :
| 6 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(7)----->[7][4] :
| 7 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(8)----->[8][4] :
| 8 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(9)----->[9][4] :
| 9 <-.recv()--------------------[?][?]------------.recv()-> 4
| : [?][?] :
| : .send(A)----->[A][A] :
| A <-.recv()--------------------[?][A]
| : [?][A]
| : .send(B)----->[B][A]
| B <-.recv()--------------------[?][A]
v : [ [
v : [
v :
v
"从此处开始的快速订阅者上的消息与慢速订阅者上的消息对齐">
不,这不会发生。没有"阵容",而只是持续时间的巧合,其中快速SUB
还没有达到 20 倍.recv()
-s,然后慢速(-ed)-SUB
终于在它阻止sleep( 3 )
之后得到了。
最初的"差距"只是sleep( 3 )
阶段的影响,在这个阶段,较慢的SUB
不会尝试接收任何东西。
main(){
|
| async(launch::async,fast|_fast____________|
| async(launch::async,slow| .setsockopt |_slow____________|
| ... | .setsockopt | .setsockopt |
| ... | .connect | .setsockopt |
| thread | ~~~~~~? | .connect |
| |_pub___________________| ~~~~~~? | ~~~~~~? |
| | .setsockopt | ~~~~~~? | ~~~~~~? |
| | .bind | ~~~~~~? | ~~~~~~? |
| | ~~~~~~? | ~~~~~~? | ~~~~~~? |
| | ~~~~~~=RTO | ~~~~~~? | ~~~~~~? |
| | .send()-s 1,2,..99| ~~~~~~? | ~~~~~~? |
| | .send()-s 23456,..| ~~~~~~=RTO | ~~~~~~=RTO |
| | .send()-s 25988,..| 25988 --> v[ 0]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 52522,..| 52522 --> v[ 1]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 79197,..| 79197 --> v[ 2]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 106365,..| 106365 --> v[ 3]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 132793,..| 132793 --> v[ 4]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 159236,..| 159236 --> v[ 5]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 184486,..| 184486 --> v[ 6]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 209208,..| 209208 --> v[ 7]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 234483,..| 234483 --> v[ 8]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 256122,..| 256122 --> v[ 9]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 281188,..| 281188 --> v[10]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| | .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| | .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| | .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| | .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| | .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| | .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| | .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| | .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| | .send()-s 651159,..| | 651159 --> v[ 9]|
| | .send()-s 675031,..| return v | 675031 --> v[10]|
| | .send()-s 701533,..|_________________| 701533 --> v[11]|
| | .send()-s 727817,..| | 727817 --> v[12]|
| | .send()-s 754154,..| | 754154 --> v[13]|
| | .send()-s 778654,..| | 778654 --> v[14]|
| | .send()-s 804137,..| | 804137 --> v[15]|
| | .send()-s 830677,..| | 830677 --> v[16]|
| | .send()-s 854959,..| | 854959 --> v[17]|
| | .send()-s 878841,..| | 878841 --> v[18]|
| | .send()-s 902601,..| | 902601 --> v[19]|
| | .send()-s 912345,..| | |
| | .send()-s 923456,..| | return v |
| | .send()-s 934567,..| |_________________|
| | .send()-s 945678,..|
| | .send()-s 956789,..|
| | .send()-s 967890,..|
| | .send()-s 978901,..|
| | .send()-s 989012,..|
| | .send()-s 990123,..|
| | .send()-s ad inf,..|
虽然PUB
端代码会尽可能快地调用.send()
-s,但它是本地的Context()
-instance 没有保留比只接受一条这样的消息更多的空间,每当一个排队的独奏位置被占用时,所有其他消息都会被静默丢弃。
每当HWM == 1
标记恢复到零时,内部机制确实允许下一个其他.send()
将消息的实际内容(有效负载)向下传递到队列存储,并且由于HWM
绑定逻辑,所有来自.send()
-s的后续尝试都开始被静默丢弃。