我可以为时间戳模式指定Kafka JDBC连接器的查询吗



我想根据时间戳从表中获取数据(增量主键不可用(。Kafka JDBC连接器通过使用查询来获取时间戳大于上一次迭代中最大时间戳的所有行来实现这一点,从Kafka JDBC连接器的以下代码中可以看出。

protected void timestampWhereClause(ExpressionBuilder builder) {
builder.append(" WHERE ");
coalesceTimestampColumns(builder);
builder.append(" > ? AND ");
coalesceTimestampColumns(builder);
builder.append(" < ? ORDER BY ");
coalesceTimestampColumns(builder);
builder.append(" ASC");
}

然而,通过这种方式,您可能会错过几行,因为您可能有具有相同时间戳的行,这些行在上一次迭代中没有完全消耗掉。我想做的是将此查询条件更改为查询值与上一个时间戳相同或更大的时间戳。通过配置可以做到这一点吗?或者我需要修改代码并创建自己的jar文件吗?

除了Iskuskov Alexander建议的timestamp.delay.interval.ms外,还可以考虑使用基于日志的CDC来实际捕获每个事件。您所描述的场景是基于查询的CDC的局限性之一。

更多信息:https://rmoff.dev/no-more-silos

最新更新