春季卡卡 - 从头开始寻找偏移



我想从开头使用消息 偏移量.为此,我在属性文件中添加了一个属性"seekToBeginning"=true。我的类具有实现 ConsumerSeekAware 的@KafkaListener,并且我已经覆盖了 onPartitionsAssigned(( 的方法,如下所示。我想知道我是否以正确的方式做这件事。此方法被调用 3 次(有 3 个分区(。另外,我担心的是当存在提交失败异常时,此方法也会被调用。请让我知道以下内容是否正确,或者我应该按分区以及如何过滤。另外,请让我知道如何在提交失败异常的情况下处理此问题。

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (seekToBeginning)
{
assignments.forEach(
(topic, action) -> callback.seekToBeginning(topic.topic(), topic.partition()));
}
}```

如果您有并发性 = 3,那么,是的,它将被调用 3 次,每个消费者一次。

从 2.3.4 开始,有更方便的方法:

/**
* Queue a seekToBeginning operation to the consumer for each
* {@link TopicPartition}. The seek will occur after any pending offset commits.
* The consumer must be currently assigned the specified partition(s).
* @param partitions the {@link TopicPartition}s.
* @since 2.3.4
*/
default void seekToBeginning(Collection<TopicPartition> partitions) {

您需要一个布尔字段来仅在初始分配时执行查找,而不是在重新平衡后执行查找。

如果只有一个使用者(并发 = 1(,则可以是简单的布尔值。

例如boolean initialSeeksDone.

对于并发> 1,您需要一个ThreadLocal

ThreadLocal<Boolean> initialSeeksDone;

然后

if (this.initialSeeksDone.get() == null) {
//seek
this.initialSeeksDone.set(true);
}

最新更新