一个KStream到多个主题使用重复的消息;广播



在Kafka流拓扑中,我想使用KStream从传入主题路由流,增强消息,并输出到目标主题。然而,在流的中间,我还想将所有消息发送到另一个主题。

KStream<Integer, GenericRecord> ksDevice = builder.stream("source_topic", Consumed.with(Serdes.Integer(), genericRecordSerde));
KStream<Integer, GenericRecord> ksEnhance = builder.stream("enhance_topic", Consumed.with(Serdes.Integer(), genericRecordSerde));
ksDevice.join(ksEnhance, (k, v) -> /* enhance message */))
.to("orthogonal_topic")
.to("destination_topic")

这显然是不可能的,因为.to()会终止流。我也不能使用kstream.branch(),因为这会将消息发送到一个主题或另一个主题;我需要将所有消息发送到两个主题。

我看到一个帖子,作者建议你可以使用kstream.filter().to("topic")kstream.filter(),("topic"将消息流式传输到多个主题但是如果不终止流,我看不出这是怎么做到的。

链接文章正确。在你的情况下,你不需要过滤器。创建一个中间变量

KStream k = ksDevice.join(...)
k.to()
k.to()

也可以使用.through().to()

但是如前所述,您不必要地在代理上复制完全相同的数据。可以使用两个不同的消费者组来代替从一个主题中读取。

最新更新