Spring kafka支持只执行一次SeekToTimestamp



我的用例是根据时间戳设置消费者组偏移量。为此,我在ConsumerSeekAware的onPartitionsAssigned((方法中使用了ConsumerSeek回调的seekToTimestamp方法。

现在,当我启动应用程序时,它会查找我指定的时间戳,但在重新平衡过程中,它会再次查找该时间戳。

我希望只有当ConsumerGroup偏移量小于该特定时间戳的偏移量时才会发生这种情况,如果它大于该偏移量,则不应进行搜索。

有没有一种方法可以实现这一点,或者Spring Kafka是否为新的ConsumerGroup提供了一些监听器,以便在创建新的消费者组时,它将基于时间戳调用seek,否则将使用现有的偏移量?

public class KafkaConsumer implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = 1623775969;
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}

只需在实现中添加一个布尔字段(boolean seeksDone;(;在寻求之后将其设置为真,并且只有在它是假的情况下才寻求。

但是,如果在第一次重新平衡中只得到分区1和3,在下一次重新平衡时只得到分区1,2,3,4,那么您必须决定该怎么办。

当然,如果您只有一个应用程序实例,这不是问题。但是,如果您需要在第一次分配时查找每个分区,则必须跟踪每个分区的状态。

最新更新