是否可以将KAFKA消费者组中Kafka消费者组的主题重置为Kafka连接器中的主题



我的kafka接收器连接器读取了多个主题(配备了10个任务(,并从所有主题中处理了300多个记录。根据每个记录中保存的信息,连接器可以执行某些操作。

以下是键:触发记录中的值对的示例:

"REPROCESS":"my-topic-1"

阅读此记录后,我需要将主题" My-Topic-1"的偏移重置为每个分区中的0。

我已经阅读了许多地方创建新的KafkaConsumer,订阅了主题分区,然后调用subscribe(...)方法是推荐的方法。例如,

public class MyTask extends SinkTask {
    @Override
    public void put(Collection<SinkRecord> records) {
        records.forEach(record -> {
        if (record.key().toString().equals("REPROCESS")) {
            reprocessTopicRecords(record);
        } else {
            // do something else
        }
        });
    }
    private void reprocessTopicRecords(SinkRecord record) {
        KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer = 
            new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
        reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
            new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
                    // do offset reset here
                }
            }
        );
    }
}

但是,以上策略对我的案例不起作用,因为:1.这取决于发生的团体重新平衡(并非总是发生(2.传递给onPartitionsAssigned方法的"分区"是动态分配的分区,这意味着这些仅是一组完整分区的子集,需要将其偏移重置。例如,此sinktask将仅分配8个保存" My-Topic-1"记录的8个分区中的2个。

我还考虑使用assign(),但这与SinkConnector/SinkTask实现中的分布式消费者模型(消费者组(不兼容。

我知道KAFKA命令行工具kafka-consumer-groups可以做我想要的(我认为(:https://gist.github.com/marwei/cd40657c481f94ebe273ecc166601674b

总而言之,我想使用Java API重置给定主题的所有分区的偏移,然后让接收器连接器拾取偏移更改并继续执行它一直在做的事情(处理记录(。

预先感谢。

我能够通过使用一系列Confluent的Kafka-rest-Proxy API来实现Kafka Connect消费者组的重置偏移:https://docs.confluent.io/current/current/kafka-rest/api.html

此实现不再需要原始帖子中描述的"触发记录"方法FIR,并且纯粹基于REST API。

  1. 暂时删除kafka连接器(这将删除连接器的消费者和(

  2. 为同一消费者组创建一个消费者实例("连接 - "(

  3. 让实例订阅您要重置的请求的主题

  4. 做一个虚拟的民意调查('subscribe'被评估了'(

  5. 重置指定主题的消费者组主题偏移

  6. 做一个虚拟的民意调查(懒惰的'seky'(对消费者的当前偏移状态(在代理中(

  7. 重新创建Kafka连接器(具有相同的连接器名称( - 重新平衡后,消费者将加入该组并阅读最后的偏移量(从0开始(

  8. 删除临时消费者实例

如果您能够使用CLI,则步骤2-6可以替换为:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

对于那些试图通过本机Java API在Kafka连接器代码中执行此操作的人,您不幸: - (

您正在寻找Seek方法。要么偏移

consumer.seek(new TopicPartition("topic-name", partition), offset);

seekToBeginning

但是,我觉得您会与Connect Api API的消费者组竞争。换句话说,假设您使用单独的组ID设置消费者,那么您本质上是从源主题中两次消耗记录,一次是通过连接,然后是您自己的消费者实例。

除非您也明确寻求连接自己的消费者实例(未暴露(,否则您将进入一个怪异的状态。例如,尽管您自己的消费者会在寻找旧的偏移量,否则您的任务仅在新记录上执行,否则您仍将在处理旧事件时进行旧事件。

此外,最终,由于保留政策,您可能会在主题的一开始就获得一个重新赛事活动,例如,旧记录到期,使您的消费者根本不进步,并通过寻求一开始就不断地重新平衡其小组。

我们必须进行非常相似的偏移重置练习。

KafkaConsumer.seek()KafkaConsumer.commitSync()合并良好。

如果您要处理很多主题和分区(Javadoc(,还有其他值得一提的选择。

AdminClient.alterConsumerGroupOffsets(
  String groupId, 
  Map<TopicPartition,OffsetAndMetadata> offsets
)

我们很幸运,因为我们有一段时间停止Kafka Connect实例的奢侈品,因此没有消费者组竞争。

相关内容

  • 没有找到相关文章

最新更新