我使用Kafka Connect添加了JDBC源连接器,它从MySQL表中获取数据。
连接器已成功添加,并且数据正在其中实时流动。
但相应的主题并不包含以前的所有数据。
我已尝试重新启动Kafka Connect。
这是我的JDBC源连接器配置:
{
"name": "kb_yp_loan",
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "XXXX",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "XXXX",
"config.action.reload": "restart",
"connection.url": "jdbc:mysql://XXXX/XX?user=XXXX&password=XXXX&useCursorFetch=true&defaultFetchSize=1000",
"connection.user": "XXXX",
"connection.password": "XXXX",
"table.whitelist": "yp_loan",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "kb_"
}
请建议我如何在我的卡夫卡主题中携带所有以前的数据。
在connect-distributed.properties
中,您需要添加consumer.auto.offset.reset=earliest
来启动Connect使用者实例以读取所有现有的主题数据。
否则,连接器将从当前最新的主题偏移开始。