为什么ZeroMQ SUB缺少消息



我在每台计算机上建立了大约12000个订阅者,线程如下

用户端:

def client(id):
context=zmq.Context()
subscriber=context.socket(zmq.SUB)
subscriber.connect('ip:port')
subscriber.setsockopt(zmq.SUBSCRIBE,(id+'e').encode())
while 1:
signal=subscriber.recv_multipart()

write logs...

for i in range(12000):
threading.Thread(target=client,args=(str(i+j*12000),)).start()
#j is arbitrary unduplicated int

发行方:

subscriber=zmq.Context().socket(zmq.PUB)
subscriber.bind('tcp://*:port')
while 1:
for id in client_id:
subscriber.send_multipart([(id+'e').encode()]+[message])

当我使用多台计算机(通过使用不同的j(来建立订阅者时,有时一些订阅者根本无法接收消息。

若我重新启动订阅者,那个些并没有收到消息的人就正常了。但那些正常的人却无法接收信息。

这些问题不会显示任何错误,只能在我的日志中找到。

这个问题会发生过度连接吗?

随着连接/消息/大小的计数越来越大,一些默认的猜测通常不再足够。尝试在PUB端配置上扩展一些其他工作默认值,在那里问题似乎开始阻塞(别忘了,自v3.?+以来,订阅列表处理已从SUB侧转移到中央PUB-侧。这减少了数据流的数量,但在PUB-侧的一些额外成本(此处增长到显著数量(~用于缓冲区的RAM+用于TOPIC列表过滤的CPU。。。

因此,让我们从PUB侧的以下步骤开始:

aSock2SUBs = zmq.Context( _tweak_nIOthreads ).socket( zmq.PUB ) # MORE CPU POWER
aSock2SUBs.setsockopt( zmq.SNDBUF, _tweak_SIZE_with_SO_SNDBUF ) # ROOM IN SNDBUF

最后但并非最不重要的是,PUB确实会静默地丢弃任何不会"丢失"的消息;"适合";在目前的HighWaterMark级别下,所以让我们也调整一下这个:

aSock2SUBs.setsockopt( zmq.SNDHWM, _tweak_HWM_till_no_DROPs )   # TILL NO DROPS

其他{ TCP_* | TOS | RECONNECT_IVL* | BACKLOG | IMMEDIATE | HEARTBEAT_* | ... }低级别参数设置可能有助于进一步使您的12k+SUB群与其他(友好和敌对(流量和平共处,并使您的应用程序更加健壮,而不是仅仅依赖预先准备好的API-defaults。

请同时参阅ZeroMQ API文档和O/S默认值,因为这些ZeroMQ低级属性中的许多属性也依赖于O/S实际配置值。

您还应该注意,在Python中生成12k+线程仍然会留下纯粹的[SERIAL]代码执行,因为Python中心GIL锁所有权(独占(避免了(是的,主要避免了(任何形式的[CONCURRENT]协同执行,因为GIL锁的所有权是独占的,并且[SERIAL]将任何数量的线程重新分配到等待队列中,并导致块的执行顺序(默认情况下,Python 2将每100条指令切换一次线程。由于Python 3.2+,默认情况下GIL将在5毫秒(5000[us](后释放,以便其他线程有机会尝试&也获取GIL锁。如果交换GIL锁所有权的12k+线程之战实际上导致";几乎阻塞";用于消息缓冲、堆叠、发送、重传直到及时确认接收的任何和所有TCP/IP工具。人们可能会对其进行测试,直到出现出血边缘,但如果其他参数经过良好的稳健性调整,选择一些更安全的天花板可能会有所帮助。


最后但并非最不重要的是,享受零的禅,这是Martin SUSTRIK在分布式计算方面的杰作,经过精心设计,最终可扩展,几乎为零延迟,非常舒适,广泛移植的信令&消息传递框架。

除了user3666197的答案,您可能还需要考虑所有这些客户端连接所需的时间。PUBlisher不知道应该有多少SUBscriber,只会从第一次连接开始,继续向当前连接的SUBscribers发送消息。PUBlisher套接字不会挂起它发送的消息,以防将来有更多SUBscriber在某个未定义的时间连接。一旦一条消息被传输到一个或多个SUBscriber,它就会从PUBlisher的队列中删除。此外,连接不是即时建立的,12000是相当多的连接。

不管你是先启动PUBlisher还是SUBscriber程序;一旦两个程序都在运行,您的12000个连接将在一段时间内建立,这将异步写入您自己的线程。一些抄写员将开始收到信息,而其他抄写员仍然不知道PUBlisher。最终,当所有12000个连接都完成时,它就会平稳下来。

最新更新