如何在JDBC Source Connector中将偏移量设置为开始,以从Kafka中的MySQL表中获取所有数据



我使用Kafka Connect添加了JDBC源连接器,它从MySQL表中获取数据。

连接器已成功添加,并且数据正在其中实时流动。

但相应的主题并不包含以前的所有数据。

我已尝试重新启动Kafka Connect。

这是我的JDBC源连接器配置:

{
"name": "kb_yp_loan",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "XXXX",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "XXXX",
"config.action.reload": "restart",
"connection.url": "jdbc:mysql://XXXX/XX?user=XXXX&password=XXXX&useCursorFetch=true&defaultFetchSize=1000",
"connection.user": "XXXX",
"connection.password": "XXXX",
"table.whitelist": "yp_loan",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "kb_"

}

请建议我如何在我的卡夫卡主题中携带所有以前的数据。

connect-distributed.properties中,您需要添加consumer.auto.offset.reset=earliest来启动Connect使用者实例以读取所有现有的主题数据。

否则,连接器将从当前最新的主题偏移开始。

最新更新