ZeroMQ不同速度的订阅者看到相同的消息

  • 本文关键字:消息 速度 ZeroMQ c++ zeromq
  • 更新时间 :
  • 英文 :


我在 c++ 中使用 zmq 2.2(我知道是旧版本)来创建具有多个以不同速度读取消息的连接订阅者的发布者。根据我对文档的理解,以及Peter Hintjens在这里的回答,每个订阅者都有自己的队列,发布者为每个连接的订阅者都有一个队列。这似乎表明每个订阅者都独立于其他订阅者接收来自发布者的消息。

但是,在下面的代码片段中,快速和慢速订阅者会收到类似的消息或完全相同的消息(即使我增加了A点的睡眠时间并更改了B点的ZMQ_HWM,也会发生这种情况)。

谁能阐明为什么会发生这种情况?

#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;
vector<int> slow_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
sleep(3);  // 3 seconds
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
usleep(10000);  // 10 miliseconds ___________________________POINT A
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
void publisher(int64_t hwm)
{
context_t context{1};
socket_t socket(context, ZMQ_PUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.bind("tcp://*:5554");
int count = 0;
while (true) {
msg_t msg(sizeof(count));
memcpy(msg.data(), &count, sizeof(count));
socket.send(msg);
count++;
}
}
int main() 
{
int64_t hwm = 1;  // __________________________________________POINT B
int to_read = 20;
auto fast = async(launch::async, fast_consumer, hwm, to_read);
auto slow = async(launch::async, slow_consumer, hwm, to_read);
hwm = 1;  // Don't queue anything on the publisher
thread pub(publisher, hwm);
auto slow_v = slow.get();
auto fast_v = fast.get();
cout << "fast    slow" << endl;
for (int i = 0; i < fast_v.size(); i ++)
{
cout << fast_v[i] << "   " << slow_v[i] << endl;
}
exit(0);
}

编译者:g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread者 海湾合作委员会 6.3

示例输出:

fast    slow
25988   305855
52522   454312
79197   477807
106365   502594
132793   528551
159236   554519
184486   581419
209208   606411
234483   629298
256122   651159
281188   675031
305855   701533  // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312   727817
477807   754154
502594   778654
528551   804137
554519   830677
581419   854959
606411   878841
629298   902601

每个订阅者都有自己的队列

是的,它确实...

这来自PUB.Context()实例的设计属性,其中发生了发送队列管理(稍后会详细介绍)。

人们可能会在[不到五秒的时间内完成ZeroMQ层次结构]部分中简要阅读有关主要概念技巧的简短阅读。

这似乎表明每个订阅者都独立于其他订阅者接收来自发布者的消息。

是的,它确实...

各个">专用"队列之间没有交互。这里重要的是ZMQ_HWM,其副作用作用是"阻止者"语义。

在此设置中,简约ZMQ_HWM保护/阻止任何新条目插入PUB端"私有"发送队列(大小不超过根据ZMQ_HWM == 1),直到它被成功远程清空(由"远程"SUBContext()自主异步"内部"传输相关计划,在其可能(重新)加载该SUB-侧"私有"-接收-队列(大小,再次,不超过根据ZMQ_HWM == 1)

换句话说,PUB.send()-s 的有效负载将被有效地丢弃,直到远程*_SUB.recv()-s 将从其"远程"Context()-实例的接收队列中卸载"阻塞"有效负载(根据ZMQ_HWM == 1,大小,设计为不能存储任何单个有效负载超过一个)。

以这种方式,PUB.send()-er 在(秘密阻止)测试期间发射了超过~ 902601条消息,在SUB端(== to_read)接收了大约20条消息。

所有这些902581+消息在调用.send()方法时被Context()直接扔在PUB边。


它实际上是如何在内部工作的?Context()内部的简化视图

给定上面的模型示例,Context()托管的队列池根据.connect()ed 对等节点出现和消失而增长/合约,但在 ZeroMQ API v2.2 中,TX 和 RX 侧具有相同的高水位标记上限。如前所述,尝试.send()任何超过此限制的内容都会被丢弃。

TIME                   _____________________________
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     PUB.setsockopt(  ZMQ_HWM, 1 );]
v                     PUB.send()-s     [        |   ]
v                        :             [        +-----------------QUEUE-length ( a storage depth ) is but one single message
v    _________________   :             [             
v   [                 ]  :             [Context()-managed pool-of-QUEUE(s)
v   [                 ]  :             [
v   [                 ]  :             [          ___________________
v   [                 ]  :             [         [                   ]
v   FAST_SUB.connect()---:------------>[?]       [                   ]
v   FAST_SUB.recv()-s    :             [?]       [                   ]
v           :            :             [?]       [                   ]
v           :            :             [?][?]<---SLOW_SUB.connect()  ]
v           :            :             [?][?]    SLOW_SUB.recv()-s   ]
v           :            .send(1)----->[1][1]            :
|       1 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(2)----->[2][1]            :
|       2 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(3)----->[3][1]            :
|       3 <-.recv()--------------------[?][?]------------.recv()-> 1
|           :                          [?][?]            :
|           :            .send(4)----->[4][4]            :
|       4 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(5)----->[5][4]            :
|       5 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(6)----->[6][4]            :
|       6 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(7)----->[7][4]            :
|       7 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(8)----->[8][4]            :
|       8 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(9)----->[9][4]            :
|       9 <-.recv()--------------------[?][?]------------.recv()-> 4
|           :                          [?][?]            :
|           :            .send(A)----->[A][A]            :
|       A <-.recv()--------------------[?][A]
|           :                          [?][A]
|           :            .send(B)----->[B][A]
|       B <-.recv()--------------------[?][A]
v           :                          [  [
v           :                          [
v           :
v

"从此处开始的快速订阅者上的消息慢速订阅者上的消息对齐">

不,这不会发生。没有"阵容",而只是持续时间的巧合,其中快速SUB还没有达到 20 倍.recv()-s,然后慢速(-ed)-SUB终于在它阻止sleep( 3 )之后得到了。

最初的"差距"只是sleep( 3 )阶段的影响,在这个阶段,较慢的SUB不会尝试接收任何东西。

main(){
|  
| async(launch::async,fast|_fast____________|
| async(launch::async,slow|     .setsockopt |_slow____________|
| ...                     |     .setsockopt |     .setsockopt |
| ...                     |     .connect    |     .setsockopt |
| thread                  |      ~~~~~~?    |     .connect    |
| |_pub___________________|      ~~~~~~?    |      ~~~~~~?    |
| |    .setsockopt        |      ~~~~~~?    |      ~~~~~~?    |
| |    .bind              |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~?           |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~=RTO        |      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  1,2,..99|      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  23456,..|      ~~~~~~=RTO |      ~~~~~~=RTO |
| |    .send()-s  25988,..|  25988 --> v[ 0]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  52522,..|  52522 --> v[ 1]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  79197,..|  79197 --> v[ 2]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 106365,..| 106365 --> v[ 3]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 132793,..| 132793 --> v[ 4]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 159236,..| 159236 --> v[ 5]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 184486,..| 184486 --> v[ 6]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 209208,..| 209208 --> v[ 7]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 234483,..| 234483 --> v[ 8]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 256122,..| 256122 --> v[ 9]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 281188,..| 281188 --> v[10]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| |    .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| |    .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| |    .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| |    .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| |    .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| |    .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| |    .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| |    .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| |    .send()-s 651159,..|                 | 651159 --> v[ 9]|
| |    .send()-s 675031,..|     return v    | 675031 --> v[10]|
| |    .send()-s 701533,..|_________________| 701533 --> v[11]|
| |    .send()-s 727817,..|                 | 727817 --> v[12]|
| |    .send()-s 754154,..|                 | 754154 --> v[13]|
| |    .send()-s 778654,..|                 | 778654 --> v[14]|
| |    .send()-s 804137,..|                 | 804137 --> v[15]|
| |    .send()-s 830677,..|                 | 830677 --> v[16]|
| |    .send()-s 854959,..|                 | 854959 --> v[17]|
| |    .send()-s 878841,..|                 | 878841 --> v[18]|
| |    .send()-s 902601,..|                 | 902601 --> v[19]|
| |    .send()-s 912345,..|                 |                 |
| |    .send()-s 923456,..|                 |     return v    |
| |    .send()-s 934567,..|                 |_________________|
| |    .send()-s 945678,..|
| |    .send()-s 956789,..|
| |    .send()-s 967890,..|
| |    .send()-s 978901,..|
| |    .send()-s 989012,..|
| |    .send()-s 990123,..|
| |    .send()-s ad inf,..|                    

虽然PUB端代码会尽可能快地调用.send()-s,但它是本地的Context()-instance 没有保留比只接受一条这样的消息更多的空间,每当一个排队的独奏位置被占用时,所有其他消息都会被静默丢弃。

每当HWM == 1标记恢复到零时,内部机制确实允许下一个其他.send()将消息的实际内容(有效负载)向下传递到队列存储,并且由于HWM绑定逻辑,所有来自.send()-s的后续尝试都开始被静默丢弃。

最新更新