如何使用 MQTT Paho 订阅 RabbitMQ 队列



我正在尝试从我的Android应用程序连接到一个名为"messages"的队列。

生产者(AMQP协议下的一个Web服务)已经连接,可以通过RabbitMQ管理面板进行检查。

要从我的Android设备进行连接,我正在像这样编码。

private void connect() throws Exception {
    this.sampleClient = new MqttClient(this.broker, this.clientId);
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setUserName("user");
    connOpts.setPassword("user".toCharArray());
    /*connOpts.setConnectionTimeout(60 * 10);
    connOpts.setKeepAliveInterval(60 * 5);*/
    connOpts.setCleanSession(true);
    this.sampleClient.connect(connOpts);
    this.sampleClient.setCallback(this);
    this.sampleClient.subscribe("messages");
    if(!this.sampleClient.isConnected()){
        System.out.println("Not Connected");
        return;
    }
    System.out.println("Connected");
}
我尝试过"amq.topic","amq.topic.

*","amq.topic.messages"等...但是当我查看 RabbitMQ 队列部分时,"消息"与 0 个消费者一起,并且已经自动设置了一个名为"mqtt-subscription-Sampleqos1"的新队列。

发生了什么事情?如何绑定到"消息"队列?

关于这个问题有两个要点。

根据 RabbitMQ MQTT 文档: http://www.rabbitmq.com/mqtt.html

首先,每个队列都通过 mqtt-plugin 自动绑定到 amq.topic 交换。

其次,每个订阅者都有自己的队列,看起来像这样,mqtt-subscription-{cliend_id}{qosX}(其中X是订阅的qos级别)

因此,生产者必须将消息发布到"amq.topic"交换和"amq.topic.."路由密钥

,而接收者必须订阅"amq.topic.."路由密钥。

首先,确保启用了 MQTT 插件: rabbitmq-plugins enable rabbitmq_mqtt

从客户端(这里是你的Android应用程序),你需要一个主题的订阅者,比如说,主题my/android/app/messages。

this.sampleClient.subscribe("my/android/app/messages");

然后,从服务器端,由于 RabbitMQ 的实现,您需要使用适当的路由密钥 my.android.app.messages 将消息发送到特殊的交换 'amq.topic' (注意 '/' 和 '.' 之间的映射,MQTT 使用/和 AMQP 使用 .)。例如,如果您通过pika AMQP Python lib发布,则代码将如下所示:

channel.basic_publish(
    exchange='amq.topic',
    routing_key='my.android.app.messages',
    body='hello world'
)

在您的情况下,您希望从队列"消息"接收消息,基本上无法直接从 MQTT 客户端上的该 AMQP 队列接收订阅者消息。解决方法是创建一个在服务器端运行的服务,作为 AMQP 订阅者工作,从"消息"队列接收消息,并使用正确的路由密钥透明转发消息以交换 amq.topic。

希望我的回答有帮助。

相关内容

  • 没有找到相关文章

最新更新