一个接一个处理卡夫卡话题



我是卡夫卡的新手。

在我们的项目中,有一个源系统将在不同的主题中发布不同类型的json消息(类型1、类型2(。我们有春季微服务,卡夫卡的听众可以收听这些主题。我们必须将源消息转换为不同的json格式,并将其发送到另一个REST API,但这里需要注意的是,在发送typ2消息之前,我们必须首先处理所有的type1消息。如果我们早于type1发送type2,我们将从REST API获得错误,因为它无法在没有type1的情况下处理type2。

我们在每条消息中都有一个参数,它告诉我们消息是类型1还是类型2

我们曾考虑使用这些方法。我仍然在读关于春天卡夫卡的书,甚至不知道这些是可能的,所以如果有些事情是愚蠢的或不可能的,请原谅。

  1. 阅读所有消息,在处理type2之前先处理type1(将消息保存在kafka主题中(。这可能吗?这就像如果消息是type2,现在什么都不要做
  2. 读取每条消息,只处理type1,如果是type2,则将其保存在db中,以便在完成所有type1消息后进行处理。要做到这一点,我需要知道是否所有type1消息都已处理。我该如何检查
  3. 考虑让源团队处理特定主题中的类型1消息和另一主题中的2消息。在春季,一个listner阅读类型1主题,另一个lister阅读类型2主题。默认情况下,暂停type2 listener。首先处理类型1主题,如果完成了,请继续处理类型2侦听器

或者如果你能帮助我更好的方法,我会很棒的。

如果您需要任何其他详细信息,请帮助我并让我知道。提前谢谢。

最简单、最高效的方法是使用2个不同的主题/侦听器,并控制侦听器容器的生命周期(例如,使用空闲容器事件来知道何时处理了所有类型1(。

请参阅此问题,以获取相关示例和文档。

1也会起作用;只需将分区倒带(寻找(到初始位置,然后从同一位置重新处理。请参阅文档。

最新更新