我创建了一个sink-kafka连接,用于将数据转换到其他存储;当使用kafka connect rest api
创建新连接器时,我想将auto.offset.reset
设置为latest
;我在配置中设置了consumer.auto.offset.reset: latest
;
json
{
"name": "test_v14",
"config": {
"name": "test_v14",
"consumer.auto.offset.reset": "latest",
"connector.class": "...",
...
}
}
但当任务开始时,卡夫卡消费者仍然从最早的民调记录;将auto.offset.reset
设置为最新的任何其他方法也是如此;
Kafka 2.3之前
需要在connect-distributed.properties
文件(Worker(中设置consumer.auto.offset.reset
。
它不能应用于任何特定的连接器,除非连接器类显式创建并加载在该属性中读取的自己的Consumer对象。
从Apache Kafka 2.3开始,现在可以将其设置为连接器配置的一部分。
在工作者集合上:
connector.client.config.override.policy=All
然后在连接器中,您可以指定
"consumer.override.auto.offset.reset": "latest"
有关更多详细信息,请参阅:https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/