如何使用rest api设置kafkaconnectauto.offset.reset



我创建了一个sink-kafka连接,用于将数据转换到其他存储;当使用kafka connect rest api创建新连接器时,我想将auto.offset.reset设置为latest;我在配置中设置了consumer.auto.offset.reset: latest

json { "name": "test_v14", "config": { "name": "test_v14", "consumer.auto.offset.reset": "latest", "connector.class": "...", ... } }

但当任务开始时,卡夫卡消费者仍然从最早的民调记录;将auto.offset.reset设置为最新的任何其他方法也是如此;

Kafka 2.3之前

需要在connect-distributed.properties文件(Worker(中设置consumer.auto.offset.reset

它不能应用于任何特定的连接器,除非连接器类显式创建并加载在该属性中读取的自己的Consumer对象。

从Apache Kafka 2.3开始,现在可以将其设置为连接器配置的一部分。

工作者集合上:

connector.client.config.override.policy=All

然后在连接器中,您可以指定

"consumer.override.auto.offset.reset": "latest"

有关更多详细信息,请参阅:https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/

相关内容

  • 没有找到相关文章

最新更新