我有一堆芹菜任务,它们获取结果并将它们发布到 RabbitMQ 消息队列中。发布的结果可能会变得非常大(最多几兆)。对于将大量数据放入 RabbitMQ 消息中是否是一个好主意,意见不一,但我在其他情况下看到过这种方法,只要内存得到控制,它似乎就可以工作。
但是,对于我目前的任务集,兔子似乎只是丢弃了似乎太大的消息。我已将其简化为一个相当简单的测试用例:
#!/usr/bin/env python
import string
import random
import pika
import os
qname='examplequeue'
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='mq.example.com'))
channel = connection.channel()
channel.queue_declare(queue=qname,durable=True)
N=100000
body = ''.join(random.choice(string.ascii_uppercase) for x in range(N))
promise = channel.basic_publish(exchange='', routing_key=qname, body=body, mandatory=0, immediate=0, properties=pika.BasicProperties(content_type="text/plain",delivery_mode=2))
print " [x] Sent 'Hello World!'"
connection.close()
我有一个 3 节点的 RabbitMQ 集群,每个节点mq.example.com
轮循机制。客户端在 Ubuntu 12.04 上使用 Pika 0.9.5,RabbitMQ 集群在 Erlang R14B04 上运行 RabbitMQ 2.8.7。
执行此脚本将打印 print 语句并退出,而不会引发任何异常。该消息永远不会出现在 RabbitMQ 中。
将N
更改为10000
使其按预期工作。
为什么?
我想你在 RabbitMq 中的 tcp-backpressure mechanizm 有问题。您可以阅读有关 http://www.rabbitmq.com/memory.html 的信息。我看到两种解决此问题的方法:
- 添加 tcp-callback 并重新连接来自 rabbit 的每个 tcp-call
- 在将消息发送给兔子之前使用压缩消息,这将使推送到兔子更容易。
def compress(s): return binascii.hexlify(zlib.compress(s)) def decompress(s): return zlib.decompress(binascii.unhexlify(s))
这就是我发送和接收数据包的方式。它比十六进制更有效,因为 base64 可能使用一个字节,而十六进制需要两个字节来表示一个字符。
import zlib
import base64
def hexpress(send: str):
print(f"send: {send}")
bsend = send.encode()
print(f"byte-encoded send: {bsend}")
zbsend = zlib.compress(bsend)
print(f"zipped-byte-encoded-send: {zbsend}")
hzbsend = base64.b64encode(zbsend)
print(f"hex-zip-byte-encoded-send: {hzbsend}")
shzbsend = hzbsend.decode()
print(f"string-hex-zip-byte-encoded-send: {shzbsend}")
return shzbsend
def hextract(recv: str):
print(f"string-hex-zip-byte-encoded-recv: {recv}")
zbrecv = base64.b64decode(recv)
print(f"zipped-byte-encoded-recv: {zbrecv}")
brecv = zlib.decompress(zbrecv)
print(f"byte-encoded-recv: {brecv}")
recv = brecv.decode()
print(f"recv: {recv}")
return recv
print("sending ...n")
send = "hello this is dog"
packet = hexpress(send)
print("nover the wire -------->>>>>n")
print("receiving...n")
recv = hextract(packet)