卡夫卡消费者未返回任何记录



我正在尝试用Kafka制作一个小型PoC。然而,当用java生成使用者时,这个使用者不会得到任何消息。尽管当我用相同的url/主题启动kafka-console-consumer.sh时,我确实会收到消息。有人知道我可能做错什么吗?此代码由GET API调用。

public List<KafkaTextMessage> receiveMessages() {
log.info("Retrieving messages from kafka");
val props = new Properties();
// See https://kafka.apache.org/documentation/#consumerconfigs
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
//props.put("client.id", "my-topic consumer");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ImmutableList.Builder<KafkaTextMessage> builder;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TEXT_MESSAGE_TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
builder = ImmutableList.builder();
for (ConsumerRecord<String, String> record : records) {
builder.add(new KafkaTextMessage(record.value()));
log.info("We got at position: {} key:{} value: {}", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
return builder.build();
}

尝试在消费者属性中添加auto.offset.reset=earliest。默认值设置为latest。我之所以建议这样做,是因为我看到您的group.id设置为test,这是您可能在以前的测试中已经使用过的值。

最新更新