我以前使用过Kafka,但从未使用过流API。我的任务是构建一个可扩展的服务,该服务接受websocket连接,并根据用户id将出站消息从中心主题路由到正确的会话。
使用KStream
builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
.filter((name, publication) -> "George R. R. Martin".equals(publication.getName()))
.to(outputTopic, Produced.with(Serdes.String(), publicationSerde));
但是filter命令是否使用来自主题的每条消息并在应用程序空间中执行筛选?还是KStream
KStream
如果过滤器的唯一目的是消耗一个主题的所有消息,并丢弃那些不相关的消息,我可以手工完成。
您是正确的-需要对消息进行反序列化,然后根据谓词(在应用程序空间中)进行检查
扔掉那些不相关的,我可以手工做
当然可以,但是Kafka Streams有很有用的方法来定义会话窗口。此外,您不需要定义消费者和生产者实例来转发新主题。