Kafka偏移量管理并与DB同步



我正在开发一个使用Kafka流和数据库的应用程序。

在我的应用程序中,我手动管理Kafka偏移量,并仅在消息处理成功的情况下提交偏移量(即在处理和更新到DB成功后(。

然而,如果在更新数据库之后,我的应用程序在提交之前就关闭了,那么当它重新启动时,由于未提交的偏移量,它会导致对数据库的重复写入。

我想避免这些重复,同时仍然确保我正在处理每一条消息。做这件事的正确方法是什么?

编辑:我对DB的更新基本上是将记录的计数器增加一些值。因此,MERGE声明不是一种选择。

这有点棘手。

Kafka支持一次语义。但是,当您将数据写入外部数据存储时,您需要确保在消费者端只进行一次。

实现这一点的一种方法(正如Jay Kreps在这里提出的(是,在数据存储中维护Kafka偏移量,将其作为单个事务的一部分。因此,如果您为每个分区保留最后一个偏移量,那么当您收到的偏移量小于存储在DB中的偏移量时,您总是可以忽略来自给定分区的消息。

然而,这种方法需要注意。如果您有一个多数据中心主动-主动部署,其中如果主集群出现故障,消费者会回退到另一个不同的数据中心集群,则不能盲目依赖偏移量。Offset是一个物理id,一个集群中消息的偏移量可以不同于另一个集群上复制消息的偏移。

在这种情况下,我认为正确的方法是利用Kafka流,并在存储在压缩Kafka主题中的Kafka表(KTable(中维护计数。Kafka内部会使用生产者id、epoch、事务id等来确保只有一次语义。

最新更新