如何从多线程卡夫卡生产者实现广告顺序



我有一个带有历史记录的Ingres DB表,它记录了插入更新和删除等数据库事件。我有一个多线程的生产者。该生产者将读取历史记录表以查找要选择的表和行,然后将该行添加 Kafka 主题。现在,生产者需要确保将事件添加到 Kafka 主题中,就像历史记录表登录一样。因此,使用者以与历史记录表记录相同的顺序读取它们,并在Postgrace DB上执行它。

我可以将这些数据生成到多个生产者中。例

Producer1 has message 1 to 5
producer2 has message 6 to 10
producer3 has message 11 to 15

但是当我消费时,我会收到有关以下主题的消息

messageId 1
messageId 2
messageId 3
messageId 6
messageId 7
messageId 11

等等

我想按以下顺序获取所有消息

messageId 1
messageId 2
messageId 3
messageId 4
messageId 5
messageId 6
messageId 7
messageId 8
messageId 9

等等

注意 : - 我有 1 个主题和 1 个分区和 1 个消费者

Kafka 在发送数据时不保证顺序,因为默认情况下每个主题有多个分区,如果您没有密钥,消息会随机分配给分区。 而在下游,每个分区都可以独立使用。

如果需要保证插入和使用顺序,则需要将 Kafka 主题配置为仅使用 1 个分区。 这是保证卡夫卡秩序的唯一途径。但是,您将失去 kafka 的许多好处,即分布在多个服务器、内核等上的高性能。

您最多可以通过发送到单个分区来保持消息的顺序,使其按生产者创建的顺序排列。Kafka 分区保证了使用消息的顺序,按照在分区中创建消息的顺序。

在您的方案中,消息由多个生成者生成,并且它们不同步以按顺序使用消息填充分区。因此,不可能像您期望的那样在消费者端实现订单。

根据 Google 的建议,如果您使用同步发布商(生产者)和单个订阅者,请遵循页面后半部分 Node JS 代码中的算法,以保证处理顺序。

同样,如果您有多个发布者,则需要通过在 getPublishCounterValue 方法和 setPublishCounterValue 方法之间设置一个关键部分来同步发布者,这破坏了发布者的多线程性质。

最好的解决方案是遵循以下部分

最终结果的顺序很重要

典型用例:日志、状态更新

多线程发布商必须为每条发布/订阅事件消息附加时间戳,以便订阅者可以将事件消息作为实体存储在 Google Cloud 数据存储或 Firestore 中。单独的事件消息处理器 cron 作业可以按时间戳排序的方式检索事件消息的实体,以强制实施消息排序。

最新更新