Kafka MicrosoftSqlServerSource Connect -在主题中重复条目 &



我们需要设置Kafka MicrosoftSqlServerSource连接。这是为了捕获在Azure SQL数据库中的一个销售表中执行的所有事务(插入/更新)。

为了支持上述源连接,我们最初在数据库和表级都启用了CDC。我们还从源表中创建了一个视图,它将作为源连接的输入(在连接器配置中TableType = view)。一旦我们完成了连接器和数据库级别的设置,我们就可以看到,当在表级别发生新的更新/插入时,消息会流向与连接器一起自动创建的相应主题。

我们在测试时观察到的一个奇怪的行为是,当我们停止测试时,主题中收到的最后一条消息开始重复,直到新消息到达。

你能帮助我们理解这是否是一个系统行为吗?或者我们是否遗漏了任何导致这些重复条目的配置。请指导我们如何处理上述重复问题。

附加快照

连接器总结


Connector Class = MicrosoftSqlServerSource
Max Tasks = 1
kafka.auth.mode = SERVICE_ACCOUNT
kafka.service.account.id = **********
topic.prefix = ***********
connection.host = **************8
connection.port = 1433
connection.user = ***************
db.name = **************88
table.whitelist = item_status_view
timestamp.column.name = ProcessedDateTime
incrementing.column.name = SalesandRefundItemStatusID
table.types = VIEW
schema.pattern = dbo
db.timezone = Europe/London
mode = timestamp+incrementing
timestamp.initial = -1
poll.interval.ms = 10000
batch.max.rows = 1000
timestamp.delay.interval.ms = 30000
output.data.format = JSON

你所描述的是由

控制的
mode = timestamp+incrementing
poll.interval.ms = 10000

应该保存最后一个时间戳,然后只查询大于的时间戳。比上次……如果你得到的值大于或等于,那么这肯定是一个应该报告的错误

或者你应该阅读文档

时间戳列必须使用datetime2而不是datetime。如果时间戳列使用datetime主题可能会收到许多副本

作为一种替代方案,您可以使用Debezium(运行您自己的连接器,而不是使用Confluent Cloud产品)来真正流式传输所有表操作。

相关内容

  • 没有找到相关文章

最新更新