我们如何重置与Kafka Connect源连接器相关的状态?



我们正在使用Kafka Connect 2.5.

我们正在使用Confluent JDBC源连接器(尽管我认为这个问题主要与连接器类型无关),并且正在使用来自IBM DB2数据库的一些数据到主题上,使用'递增模式'(主键)作为每个记录的唯一id。

这在正常情况下很好;连接器第一次启动时,所有记录都被消耗并放置在主题上,然后,当添加新记录时,它们被添加到我们的主题中。在我们的开发环境中,当我们更改连接器参数等时,我们希望有效地按需重置连接器;也就是说,让它再次从表的"开始"处消费数据。

我们认为删除连接器(使用Kafka Connect REST API)会做到这一点-并且会有从Kafka Connectconnect-*元数据主题中删除有关连接器配置的所有信息的副作用。

然而,这似乎并没有发生。元数据保留在这些主题中,当我们重新创建/重新添加连接器配置(同样使用REST API)时,它会"记住"它从表中消费的偏移量。这似乎令人困惑且毫无帮助——删除连接器并不会删除其状态。有没有一种方法可以更永久地擦除连接器和/或重置其消费位置,而不是拉下整个Kafka Connect环境,这看起来很激烈?理想情况下,我们不希望直接干预内部主题。

对这个问题的部分回答:似乎我们看到的行为是预料之中的:

如果你使用增量摄取,Kafka连接的偏移量是多少存储吗?如果您删除并重新创建具有相同属性的连接器名称时,将保留前一个实例的偏移量。考虑创建连接器的场景。它成功摄取源中给定ID或时间戳值之前的所有数据表,然后删除并重新创建它。新版本的连接器将从以前的版本获得偏移量,因此只有摄取比先前处理过的数据更新的数据。你可以通过查看offset.storage.topic及其值来验证这一点为所讨论的表存储在其中。

至少对于Confluent JDBC连接器,有一个重置指针的解决方案。

就我个人而言,我仍然很困惑为什么Kafka Connect在删除时保留连接器的状态,但似乎这是设计的行为。如果有更好的(和支持的)方法来删除该状态,我仍然感兴趣。

另一篇相关博客文章:https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/

相关内容

  • 没有找到相关文章

最新更新