我使用KafkaJdbcConnect时间戳+递增模式将表行同步到Kafka。参考https://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode
挑战在于,由于默认的开始时间是1970年,因此表从开始时间开始同步。有没有任何方法可以超越开始时间(即(我只想从输入给定日期的开始同步。
您需要将timestamp.initial
设置为所需的输入给定日期。它需要设置为epoch格式。
timestamp+incrementing
模式下的SQL查询附加有:
WHERE "DBTime" < 'system high date i.e. 9999-12-12T23:59:59+00:00' AND (("DBTime" = 'timestamp.initial' AND "DBKey" > '-1') OR "DBTime" > 'timestamp.initial') ORDER BY "DBTime","DBKey" ASC
https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html#mode
如果您想从连接器的给定偏移量开始,我建议覆盖connect-offsets
主题中存储的信息。
通过Kafka REST API,您可以轻松阅读本主题的内容:
http://localhost:8082/topics/connect-偏移
在您所描述的用例的相关方法中查看kafka连接器jdbc的代码:
- io.confluent.connect.jdbc.source.TimestampIncrementingCriteria#extractValues
- io.confluent.connect.jdbc.source.TimestampIncrementingOffset#getTimestampOffset
覆盖连接偏移量主题内容似乎是目前唯一可用的方法。