我们需要设置Kafka MicrosoftSqlServerSource连接。这是为了捕获在Azure SQL数据库中的一个销售表中执行的所有事务(插入/更新)。
为了支持上述源连接,我们最初在数据库和表级都启用了CDC。我们还从源表中创建了一个视图,它将作为源连接的输入(在连接器配置中TableType = view)。一旦我们完成了连接器和数据库级别的设置,我们就可以看到,当在表级别发生新的更新/插入时,消息会流向与连接器一起自动创建的相应主题。
我们在测试时观察到的一个奇怪的行为是,当我们停止测试时,主题中收到的最后一条消息开始重复,直到新消息到达。
你能帮助我们理解这是否是一个系统行为吗?或者我们是否遗漏了任何导致这些重复条目的配置。请指导我们如何处理上述重复问题。
附加快照
连接器总结
Connector Class = MicrosoftSqlServerSource
Max Tasks = 1
kafka.auth.mode = SERVICE_ACCOUNT
kafka.service.account.id = **********
topic.prefix = ***********
connection.host = **************8
connection.port = 1433
connection.user = ***************
db.name = **************88
table.whitelist = item_status_view
timestamp.column.name = ProcessedDateTime
incrementing.column.name = SalesandRefundItemStatusID
table.types = VIEW
schema.pattern = dbo
db.timezone = Europe/London
mode = timestamp+incrementing
timestamp.initial = -1
poll.interval.ms = 10000
batch.max.rows = 1000
timestamp.delay.interval.ms = 30000
output.data.format = JSON
你所描述的是由
控制的mode = timestamp+incrementing
poll.interval.ms = 10000
它应该保存最后一个时间戳,然后只查询大于的时间戳。比上次……如果你得到的值大于或等于,那么这肯定是一个应该报告的错误
或者你应该阅读文档
时间戳列必须使用
datetime2
而不是datetime
。如果时间戳列使用datetime
,主题可能会收到许多副本
作为一种替代方案,您可以使用Debezium(运行您自己的连接器,而不是使用Confluent Cloud产品)来真正流式传输所有表操作。