如何在java中一个接一个地获取Kafka Consumer的消息



我正在使用Apache Kafka API,并试图一次只获取一条消息。我只写一个主题。我可以通过一个带有文本框的弹出式UI屏幕来发送和接收消息。我在文本框中输入一个字符串;发送"我想发多少条信息就发多少条。假设我发送了3条消息,而我的3条消息是";嗨"哈哈,"再见"还有一个";接收";按钮现在,使用TutorialsPoint中的传统代码,当我点击接收按钮时,我会立即在控制台上打印出所有3条消息(嗨,lol,再见(。然而,当我点击"打印"时,我只想一次打印一条消息;接收";在UI上。例如,当我第一次按下接收按钮时,它会打印";嗨;第二次将是";哈哈,";第三次将是";再见"我是卡夫卡的新手,对如何做到这一点感到困惑。我试着从代码中删除两个循环,所以它只有

ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

如果我只有这两行代码,当我第一次按下接收按钮时,它会打印";嗨"但当我第二次按下它时;尝试心跳失败,因为小组正在重新平衡卡夫卡"当我将max.poll.records设置为1时,我也会遇到错误,因为我希望我的所有消息最终,但当按下接收按钮时,只需要将其中一条记录到控制台。下次,将记录主题中未记录的下一条消息。

希望这是有道理的!感谢您的帮助!提前感谢!

编辑:包含队列后的新代码,这样我们就可以在发送和接收消息之间交替使用&每当出现新消息时更新队列:

if (payloadQueue.isEmpty()){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) {
log.info("No More Records to Add");
consumer.close();
}
else {
records.iterator().forEachRemaining(record -> {
log.info("RECORD: " + record);
payloadQueue.offer(record);
});
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload "{}"", subject, payload);
}
}
else {
payload = payloadQueue.poll().value();
log.info("Received event from KAFKA on subject {} with payload "{}"", subject, payload);
}

Kafka使用batch-get来提高性能,实际上不需要设置max.poll.records=1。
您想要的东西可以通过一些变通方法轻松实现。

解决方案

您可以使用Queue来存储消息,每次按下接收按钮时,您从队列中轮询一条消息,如果队列为空,则调用consumer.poll来填充队列。

代码

private Queue<ConsumerRecord<String,String>> queue=new LinkedList<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
public void buttonPressed(){
if (queue.isEmpty()){
consumer.poll(100).iterator().forEachRemaining(record->queue.offer(record));
}else {
System.out.println(queue.poll());
}
}

您应该按照@haoyu的建议添加一个队列,但手动提交消耗的偏移量。否则,应用程序重置可能会导致数据丢失(因为消息已经从主题中消耗掉,尽管没有打印到UI中(。

建议阅读";手动偏移控制";以及";将偏移存储在Kafka之外";KafkaConsumer javadoc。

相关内容

最新更新