连接到两个RabbitMQ服务器



我将python与pika一起使用,并且有以下两个类似的用例:

  1. 连接到RabbitMQ服务器A和服务器B(位于具有不同凭据的不同IP地址(,侦听服务器A上的交换机A1;当消息到达时,对其进行处理并发送到服务器B上的交换机
  2. 打开HTTP侦听器并连接到RabbitMQ服务器B;当特定的HTTP请求到达时,处理它并发送到服务器B上的交换机

唉,在这两种情况下,使用我常用的技术,当我发送到服务器B时,连接会抛出ConnectionClosedChannelClosed

我认为这就是原因:在等待传入消息时,与服务器B(其"驱动程序"(的连接缺少CPU周期,而且它从未有机会为连接套接字提供服务,因此它无法响应服务器B的心跳,因此服务器关闭了连接。

但我无法解决问题。我目前的解决方法很糟糕:我捕获ConnectionClosed,重新打开与服务器B的连接,然后重试发送消息。

但是什么是";右";怎么做?我考虑过这些,但并不是真的觉得我有所有的部分来解决这个问题:

  • 不要永远坐在服务器A的basic_consume(我通常的模式(中,而是使用超时,当我以某种方式捕捉到超时时"服务";服务器B的驱动程序上的心跳;超时消费";。。。但是我该怎么做呢?如何";让服务B的连接驱动程序服务于它的心跳">
  • 我知道套接字库的select()调用可以在几个套接字上等待消息,然后一次为等待数据包的套接字提供服务。也许这就是鼠兔SelectConnection的作用?a( 我不确定,这只是一种预感。b( 即使是正确的,虽然我可以找到如何创建此连接的示例,但我找不到如何使用来解决我的多连接案例的示例
  • 在不同的进程中设置两个服务器连接。。。并使用Python进程间队列将处理后的消息从一个进程传递到下一个进程。概念是";两个不同进程中的两个不同RabbitMQ连接因此应该能够独立地为它们的心跳提供服务";。除了我认为这有一个致命的缺陷:;服务器B";相反,将是";"卡住";在进程间队列上等待;饥饿";将会发生

昨晚我检查了StackOverflow,并在谷歌上搜索了一个小时:我一辈子都找不到这方面的博客文章或示例代码。

有什么意见吗?万分感谢!

我设法解决了这个问题,我的解决方案基于文档和pika-python谷歌小组中的答案。

首先,您的假设是正确的——连接到服务器B(负责发布(的客户端进程如果已经在其他事情上阻塞,例如等待来自服务器a的消息或在内部通信队列上阻塞,则无法回复心跳。

解决方案的关键是发布者应该作为一个单独的线程运行,并使用BlockingConnection.process_data_events来为检测信号等提供服务。看起来该方法应该在一个循环中调用,该循环检查发布者是否仍需要运行:

def run(self):
while self.is_running:
# Block at most 1 second before returning and re-checking
self.connection.process_data_events(time_limit=1)

概念证明

由于要证明完整的解决方案,需要运行两个独立的RabbitMQ实例,因此我将Git repo与适当的docker-compose.yml、应用程序代码和注释组合在一起,以测试该解决方案。

https://github.com/karls/rabbitmq-two-connections

解决方案大纲

以下是解决方案的草图,减去导入等。一些值得注意的事情:

  1. Publisher作为单独的线程运行
  2. 唯一的";工作;发布者通过Connection.process_data_events为心跳等提供服务
  3. 每当消费者想要发布消息时,发布者就会使用Connection.add_callback_threadsafe注册回调
  4. 使用者将发布者作为构造函数参数,这样它就可以发布接收到的消息,但只要您有对Publisher实例的引用,它就可以通过任何其他机制工作
  5. 该代码取自链接的Git repo,这就是为什么某些细节是硬编码的,例如队列名称等。它将适用于所需的任何RabbitMQ设置(直接到队列、主题交换、扇出等(
class Publisher(threading.Thread):
def __init__(
self,
connection_params: ConnectionParameters,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.daemon = True
self.is_running = True
self.name = "Publisher"
self.queue = "downstream_queue"
self.connection = BlockingConnection(connection_params)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, auto_delete=True)
self.channel.confirm_delivery()
def run(self):
while self.is_running:
self.connection.process_data_events(time_limit=1)
def _publish(self, message):
logger.info("Calling '_publish'")
self.channel.basic_publish("", self.queue, body=message.encode())
def publish(self, message):
logger.info("Calling 'publish'")
self.connection.add_callback_threadsafe(lambda: self._publish(message))
def stop(self):
logger.info("Stopping...")
self.is_running = False
# Call .process_data_events one more time to block
# and allow the while-loop in .run() to break.
# Otherwise the connection might be closed too early.
#
self.connection.process_data_events(time_limit=1)
if self.connection.is_open:
self.connection.close()
logger.info("Connection closed")
logger.info("Stopped")

class Consumer:
def __init__(
self,
connection_params: ConnectionParameters,
publisher: Optional["Publisher"] = None,
):
self.publisher = publisher
self.queue = "upstream_queue"
self.connection = BlockingConnection(connection_params)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, auto_delete=True)
self.channel.basic_qos(prefetch_count=1)
def start(self):
self.channel.basic_consume(
queue=self.queue, on_message_callback=self.on_message
)
try:
self.channel.start_consuming()
except KeyboardInterrupt:
logger.info("Warm shutdown requested...")
except Exception:
traceback.print_exception(*sys.exc_info())
finally:
self.stop()
def on_message(self, _channel: Channel, m, _properties, body):
try:
message = body.decode()
logger.info(f"Got: {message!r}")
if self.publisher:
self.publisher.publish(message)
else:
logger.info(f"No publisher provided, printing message: {message!r}")
self.channel.basic_ack(delivery_tag=m.delivery_tag)
except Exception:
traceback.print_exception(*sys.exc_info())
self.channel.basic_nack(delivery_tag=m.delivery_tag, requeue=False)
def stop(self):
logger.info("Stopping consuming...")
if self.connection.is_open:
logger.info("Closing connection...")
self.connection.close()
if self.publisher:
self.publisher.stop()
logger.info("Stopped")