卡夫卡的原子消息处理



我想知道以原子方式处理关于kafka主题的消息的推荐方法,例如,假设kafka生产者发布了多个密钥为k1、k2、k3的消息,现在我想原子处理它们,并让我的应用程序一起了解这些消息。可以通过一些方法来实现这一点,例如,将id和count与需要一起处理的所有消息一起传递,以便客户端等待,直到它接收到属于同一组的所有消息。是否有其他推荐的方法来解决kafka的此类问题,即原子处理一批消息的能力,以便在密钥之间保持一致性。卡夫卡消费者提供这样的能力吗?

生产时要解决的方面

  • 在Kafka或任何其他消息代理中,生产者和消费者之间没有原子性
  • 所以,当生产者发送消息时,您必须有某种共同关系id作为消息的一部分,这样消费者就知道哪些消息属于同一组
  • 但是,对于消费者来说,仅仅知道哪些消息属于一起是不够的,它还应该知道何时应该认为属于某个特定组的消息已经被完全收集,以便开始处理一个组。到目前为止,例如,如果是固定大小的组,则不需要将任何组大小作为消息的一部分发送,否则,您需要在生成的消息中添加组大小或其他内容,以向消费者发出组已完成的信号
  • 您还应该生成属于同一分区的同一组的消息

现在您已经满足了基本需求,对于其余的工作,您可以选择不同的路径

  • 例如,您可以使用类似camel-kafkaaggregatorEIP的东西来使用此主题,并写入到不同的主题,其中每个记录都是整个组消息,然后您就知道可以原子化地使用该主题

最新更新