我尝试用下面的方法发送批量消息到topic:
public SendResult send(Collection<Message> msgs,
MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
}
但是当我使用主题时,标签发生了变化。更详细的,我设置了&;batch&;作为标签生产时,但得到" batchclusterto0 "当标签被消费时,它就变了!
我用下面的代码在我的PC上试了试,但无法重现你所遇到的。你能展示你的生产者如何使用set标签编码吗?
// producer batch messages
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("xxxxx:9876");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA" + i,
"OrderID188" + i,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg1 = new Message("TopicTest",
"TagA" + i,
"OrderID188" + i,
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
List<Message> msgs = new ArrayList<>();
msgs.add(msg);
msgs.add(msg1);
producer.send(msgs);
}
} catch (Exception e) {
e.printStackTrace();
}
// consuming messages
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("xxxxx:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
消费者结果打印