如何设置消费者再平衡监听器获取调用的消费者感知再平衡监听器实例?



也许这是一个幼稚的问题,但它让我在那里呆了一段时间。请耐心等待。

我有一个类DataConsumer.java它实现了ConsumerAwareRebalanceListener

@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// seek offsets based on a given timestamp
}
@KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
receive(ConsumerRecord payload) {}
}

因此,为了使onPartitionsAssigned正常工作,我需要在另一个类中定义的kafkaListenerContainerFactory方法中调用setConsumerRebalanceListener,如下所示:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setConsumerRebalanceListener(____________);
// rest part omitted
}

我的问题是关于上面____________部分。我该放什么?

在我的理解中,方法kafkaListenerContainerFactory是在我们在DataConsumer类中初始化@KafkaListener容器时调用的,这意味着已经有一个现有的DataConsumer实例来保存@kafkaLister。如何将已经存在的 DataConsumer 实例传递给setConsumerRebalanceListener函数?

我可以搜索到的所有示例代码片段如下所示:

setConsumerRebalanceListener(new ConsumerRebalanceListener() {
//override the functions
})

但这不是在创建一个新实例吗?如果我放new DataConsumer()它将在现有实例中失去一些状态(例如寻找偏移量的时间戳(,因此这不起作用。

您可以将DataConsumer声明为@Bean(而不是使用@Component(,然后您可以在那里注入您的 bean。

但是,在这种情况下使用这是错误的机制。

改为实现ConsumerSeekAware- 容器将自动检测到您的侦听器是否实现了该实现,并将调用其onPartitionsAssigned

请参阅文档中的查找特定偏移量。

最新更新