如何增加卡夫卡消费者的请求时间



我正在使用带有Spring Listener的Kafka。下面是一段代码。在过去,我们已经发布了超过10万条消息来测试主题,系统似乎运行良好。但几天前,我更改了消费者的groupId。在那之后,这个新消费者试图从一开始就处理所有的消息,这需要很多时间。但一段时间后可能是(10秒(经纪人启动消费者。所以结果没有卡夫卡注册监听消息。

@KafkaListener(
topicPattern = "test",
groupId = "test",
id = "test",
containerFactory = "testKafkaListenerContainerFactory")
public void consume(@Payload String payload) throws IOException {
}

卡夫卡消费者配置:

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("security.protocol", "SSL");

然后,我使用cli通过以下命令读取消息,并观察到了相同的行为。整整10秒后,消费者停止阅读卡夫卡的信息。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

如何增加kafka客户端的请求时间或其他更好的方法来解决这个问题?

In the past we have published more than 1 lac message to test topic and system seems to be working fine.

你的卡夫卡有所需的数据吗?卡夫卡不会永远存储消息。消息的持续时间由broker/s中设置的保留时间决定。如果您的数据超过了保留期,它将丢失。

最新更新