KStream过滤器是否消耗每个消息?



我以前使用过Kafka,但从未使用过流API。我的任务是构建一个可扩展的服务,该服务接受websocket连接,并根据用户id将出站消息从中心主题路由到正确的会话。

使用KStream看起来非常简单。从一个在线教程:

builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
.filter((name, publication) -> "George R. R. Martin".equals(publication.getName()))
.to(outputTopic, Produced.with(Serdes.String(), publicationSerde));

但是filter命令是否使用来自主题的每条消息并在应用程序空间中执行筛选?还是KStream过滤器(Predicate谓词)包含钩子到Kafka的内部工作,允许它只接收匹配正确密钥的消息?

KStreamJavadoc似乎建议使用前者:"逐个消息消费">

如果过滤器的唯一目的是消耗一个主题的所有消息,并丢弃那些不相关的消息,我可以手工完成。

您是正确的-需要对消息进行反序列化,然后根据谓词(在应用程序空间中)进行检查

扔掉那些不相关的,我可以手工做

当然可以,但是Kafka Streams有很有用的方法来定义会话窗口。此外,您不需要定义消费者和生产者实例来转发新主题。

相关内容

  • 没有找到相关文章

最新更新