我正在尝试从我的Android应用程序连接到一个名为"messages"的队列。
生产者(AMQP协议下的一个Web服务)已经连接,可以通过RabbitMQ管理面板进行检查。
要从我的Android设备进行连接,我正在像这样编码。
private void connect() throws Exception {
this.sampleClient = new MqttClient(this.broker, this.clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("user");
connOpts.setPassword("user".toCharArray());
/*connOpts.setConnectionTimeout(60 * 10);
connOpts.setKeepAliveInterval(60 * 5);*/
connOpts.setCleanSession(true);
this.sampleClient.connect(connOpts);
this.sampleClient.setCallback(this);
this.sampleClient.subscribe("messages");
if(!this.sampleClient.isConnected()){
System.out.println("Not Connected");
return;
}
System.out.println("Connected");
}
我尝试过"amq.topic","amq.topic.*","amq.topic.messages"等...但是当我查看 RabbitMQ 队列部分时,"消息"与 0 个消费者一起,并且已经自动设置了一个名为"mqtt-subscription-Sampleqos1"的新队列。
发生了什么事情?如何绑定到"消息"队列?
关于这个问题有两个要点。
根据 RabbitMQ MQTT 文档: http://www.rabbitmq.com/mqtt.html
首先,每个队列都通过 mqtt-plugin 自动绑定到 amq.topic 交换。
其次,每个订阅者都有自己的队列,看起来像这样,mqtt-subscription-{cliend_id}{qosX}(其中X是订阅的qos级别)
因此,生产者必须将消息发布到"amq.topic"交换和"amq.topic.."路由密钥,而接收者必须订阅"amq.topic.."路由密钥。
首先,确保启用了 MQTT 插件: rabbitmq-plugins enable rabbitmq_mqtt
从客户端(这里是你的Android应用程序),你需要一个主题的订阅者,比如说,主题my/android/app/messages。
this.sampleClient.subscribe("my/android/app/messages");
然后,从服务器端,由于 RabbitMQ 的实现,您需要使用适当的路由密钥 my.android.app.messages 将消息发送到特殊的交换 'amq.topic' (注意 '/' 和 '.' 之间的映射,MQTT 使用/和 AMQP 使用 .)。例如,如果您通过pika AMQP Python lib发布,则代码将如下所示:
channel.basic_publish(
exchange='amq.topic',
routing_key='my.android.app.messages',
body='hello world'
)
在您的情况下,您希望从队列"消息"接收消息,基本上无法直接从 MQTT 客户端上的该 AMQP 队列接收订阅者消息。解决方法是创建一个在服务器端运行的服务,作为 AMQP 订阅者工作,从"消息"队列接收消息,并使用正确的路由密钥透明转发消息以交换 amq.topic。
希望我的回答有帮助。