根据 confluent 文档,当我们使用查询模式时,我们必须进行偏移量管理。根据我的理解,我们需要跟踪上次更新的时间戳,并在每次重新启动程序时将其传递到 where 子句中。谁能确认理解是否正确?提前感谢您的帮助!
您可以同时执行这两项操作 - 除了查询之外,您仍然可以设置时间戳和递增模式。 它将简单地添加一个基于 timestamp.column.name 和/或 incrementing.column.name 字段的 where 语句。如果查询需要 where 语句,甚至可以使用子查询
例如,您可以将查询设置为:从中选择 *(从树中选择苹果,其中颜色 = 绿色)作为子查询
将 timestamp.column.name 设置为成熟后,Kafka 将执行的 SQL 为:
从中选择 *(从颜色 = 绿色的树中选择苹果)作为子查询,其中成熟>偏移日期