Kafka Jdbc连接时间戳+递增模式



我使用KafkaJdbcConnect时间戳+递增模式将表行同步到Kafka。参考https://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode

挑战在于,由于默认的开始时间是1970年,因此表从开始时间开始同步。有没有任何方法可以超越开始时间(即(我只想从输入给定日期的开始同步。

您需要将timestamp.initial设置为所需的输入给定日期。它需要设置为epoch格式。

timestamp+incrementing模式下的SQL查询附加有:

WHERE "DBTime" < 'system high date i.e. 9999-12-12T23:59:59+00:00' AND (("DBTime" = 'timestamp.initial' AND "DBKey" > '-1') OR "DBTime" > 'timestamp.initial') ORDER BY "DBTime","DBKey" ASC

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html#mode

如果您想从连接器的给定偏移量开始,我建议覆盖connect-offsets主题中存储的信息。

通过Kafka REST API,您可以轻松阅读本主题的内容:

http://localhost:8082/topics/connect-偏移

在您所描述的用例的相关方法中查看kafka连接器jdbc的代码:

  • io.confluent.connect.jdbc.source.TimestampIncrementingCriteria#extractValues
  • io.confluent.connect.jdbc.source.TimestampIncrementingOffset#getTimestampOffset

覆盖连接偏移量主题内容似乎是目前唯一可用的方法。

最新更新