Spring Kafka监听所有主题并调整分区偏移量



根据spring-kafka的文档,我使用基于注解的@KafkaListener来配置我的消费者。

我看到的是-

  1. 除非我指定偏移量为零,否则在开始时,Kafka消费者会选择未来的消息而不是现有的消息。(我理解这是一个预期的结果,因为我没有指定我想要的偏移量)

  2. 我在文档中看到一个选项来指定一个主题+分区组合,以及一个零偏移量,但是如果我这样做,我必须明确指定我希望我的消费者听哪个主题。

使用上面的方法2,这是我的消费者现在的样子-

@KafkaListener(id = "{group.id}",
        topicPartitions = {
                @TopicPartition(topic = "${kafka.topic.name}",
                        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
        },
        containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String payload,
                   Acknowledgment ack) throws InterruptedException, IOException {
    logger.debug("This is what we received in the Kafka Consumer = " + payload);
    idService.process(payload);
    ack.acknowledge();
}

虽然我知道有一个选项可以指定"topicPattern"通配符或"topics"列表作为注释配置的一部分,但我没有看到我可以为列出的主题/主题模式提供从零开始的偏移值的地方。有没有一种方法可以将两者结合起来?请建议。

当使用topics和topicPatterns(而不是显式声明分区)时,Kafka决定哪个消费者实例将获得哪些分区。

Kafka将分配分区,初始偏移量将是该组id最后提交的偏移量。您目前无法更改偏移量,但我们正在考虑添加查找函数。

如果你总是想从第一个可用的偏移量开始,使用一个唯一的组id(例如UUID.randomUUID().toString())并设置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

由于Kafka将没有该组id的现有偏移量,它将使用该属性来确定从哪里开始

您也可以使用MANUAL ack模式和never ack模式,这将有效地做同样的事情。

最新更新