Kafka 流等待函数与依赖对象



我创建了一个Kafka Streams应用程序,该应用程序从不同的主题接收不同的JSON对象,我想实现某种等待功能,但我不确定如何最好地实现它。

为了简化问题,我将在下一节中使用简化的实体,我希望这个问题可以很好地描述。 因此,在我的一个流中,我收到汽车对象,每辆车都有一个ID。在第二个流中,我接收人员对象,每个人都有一个汽车 ID,并被分配给具有此 ID 的汽车。

我想使用我的 Kafka Streams 应用程序从两个输入流(主题)中读取,并使用具有相同汽车 ID 的四个人来丰富汽车对象。仅当所有四个人都包含在汽车对象中时,才应将汽车对象转发到下一个下游处理器。

我计划为汽车创建一个输入流,为人员对象创建一个输入流,将 JSON 数据解析为内部对象表示形式,将两个流合并在一起,并在合并的流上应用"selectKey"函数以从实体中提取键。 之后,我会将数据推送到自定义转换函数中,该函数包含状态存储。在这个转换函数中,我将每个到达的汽车对象及其 id 存储在状态存储中。一旦新的人员对象到达,我会将它们添加到状态存储中的相应汽车对象中(请忽略此处迟到汽车的情况)。只要四个人在汽车对象中,我就会将该对象转发到下一个流函数,并将汽车对象从状态存储中删除。

这是一个合适的方法吗?我不确定可伸缩性,因为我必须确保在运行多个实例时,具有相同 id 的汽车和人员对象将由同一应用程序实例处理。我会为此使用选择键函数,这行得通吗?

谢谢!

基本设计对我来说看起来很合理。

但是,selectKey()本身是不够的,因为transform()(与DSL运算符相比)不会触发自动重新平衡。因此,您需要通过through()手动重新平衡。

stream.selectKey(...)
.through("user-created-topic")
.transform(...);

https://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning

相关内容

  • 没有找到相关文章

最新更新