使用 Pika 0.9.5 向 RabbitMQ 发送大消息:消息被 Rabbit 静默丢弃



我有一堆芹菜任务,它们获取结果并将它们发布到 RabbitMQ 消息队列中。发布的结果可能会变得非常大(最多几兆)。对于将大量数据放入 RabbitMQ 消息中是否是一个好主意,意见不一,但我在其他情况下看到过这种方法,只要内存得到控制,它似乎就可以工作。

但是,对于我目前的任务集,兔子似乎只是丢弃了似乎太大的消息。我已将其简化为一个相当简单的测试用例:

#!/usr/bin/env python
import string
import random
import pika
import os
qname='examplequeue'
connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='mq.example.com'))
channel = connection.channel()
channel.queue_declare(queue=qname,durable=True)
N=100000
body = ''.join(random.choice(string.ascii_uppercase) for x in range(N))
promise = channel.basic_publish(exchange='', routing_key=qname, body=body, mandatory=0, immediate=0, properties=pika.BasicProperties(content_type="text/plain",delivery_mode=2))
print " [x] Sent 'Hello World!'"
connection.close()

我有一个 3 节点的 RabbitMQ 集群,每个节点mq.example.com轮循机制。客户端在 Ubuntu 12.04 上使用 Pika 0.9.5,RabbitMQ 集群在 Erlang R14B04 上运行 RabbitMQ 2.8.7。

执行此脚本将打印 print 语句并退出,而不会引发任何异常。该消息永远不会出现在 RabbitMQ 中。

N更改为10000使其按预期工作。

为什么?

我想你在 RabbitMq 中的 tcp-backpressure mechanizm 有问题。您可以阅读有关 http://www.rabbitmq.com/memory.html 的信息。我看到两种解决此问题的方法:

  1. 添加 tcp-callback 并重新连接来自 rabbit 的每个 tcp-call
  2. 在将消息发送给兔子之前使用压缩消息,这将使推送到兔子更容易。
def compress(s):
     return binascii.hexlify(zlib.compress(s))
def decompress(s):
    return zlib.decompress(binascii.unhexlify(s))

这就是我发送和接收数据包的方式。它比十六进制更有效,因为 base64 可能使用一个字节,而十六进制需要两个字节来表示一个字符。

import zlib
import base64
def hexpress(send: str):
    print(f"send: {send}")
    bsend = send.encode()
    print(f"byte-encoded send: {bsend}")
    zbsend = zlib.compress(bsend)
    print(f"zipped-byte-encoded-send: {zbsend}")
    hzbsend = base64.b64encode(zbsend)
    print(f"hex-zip-byte-encoded-send: {hzbsend}")
    shzbsend = hzbsend.decode()
    print(f"string-hex-zip-byte-encoded-send: {shzbsend}")
    return shzbsend
def hextract(recv: str):
    print(f"string-hex-zip-byte-encoded-recv: {recv}")
    zbrecv = base64.b64decode(recv)
    print(f"zipped-byte-encoded-recv: {zbrecv}")
    brecv = zlib.decompress(zbrecv)
    print(f"byte-encoded-recv: {brecv}")
    recv = brecv.decode()
    print(f"recv: {recv}")
    return recv
print("sending ...n")
send = "hello this is dog"
packet = hexpress(send)
print("nover the wire -------->>>>>n")
print("receiving...n")
recv = hextract(packet)

相关内容

  • 没有找到相关文章

最新更新