RabbitMQ Streams



使用此文档作为参考:https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-first-application我在RabbitMQ中创建了一个流,并向其中添加了100万条消息

try (Environment environment = Environment.builder().uri("rabbitmq-stream://guest:guest@localhost:5552")
.build()) {
environment.streamCreator().stream("tenth-application-stream").create();
Producer producer = environment.producerBuilder().stream("tenth-application-stream") 
.build();
int messageCount = 1_000_000;
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
IntStream.range(0, messageCount).forEach(i -> {
Message message = producer.messageBuilder().properties().creationTime(System.currentTimeMillis())
.messageId(i).messageBuilder().properties().groupId(String.valueOf(i)).messageBuilder()
.addData(String.valueOf(i).getBytes(StandardCharsets.UTF_8)).build();
producer.send(message, confirmationStatus -> confirmLatch.countDown());
});
try {
boolean done = confirmLatch.await(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (StreamException e) {
System.out.println(e);
}

我正试图通过以下操作读回流中的所有消息:

Consumer consumer = environment.consumerBuilder().stream("tenth-application-stream")
.offset(OffsetSpecification.first()).messageHandler((context, message) -> {
System.out.println(message.getBody().toString());
}).build();

但是,这只会打印到消息499。如何查看流中的所有消息?

由于您的客户端已经读取了499条消息,我们知道有些消息是由生产者发布的。您是否验证过队列中实际上有499多条消息?这可以使用RabbitMQ web管理器来完成。steam队列的一个很好的特性是,在消费者读取消息后,消息可以保留在队列中。

如果其余消息都已发布,那么使用者很可能在使用所有消息之前关闭了连接。博客中的示例(此处为repo(使用Utils.waitAtMost方法延迟关闭连接:

Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);

这将一直等待,直到60秒过去或1M消息已被消耗。或者,您可以使用CountDownLatch方法保持连接打开,直到达到目标。

这两个选项对于生产用例来说似乎都不理想,但对于概念验证来说效果很好。

最新更新