Kafka 流 - 根据消息组设置不同的时间窗口



我想知道是否给定一个 KStream,可以根据消息组设置不同的时间窗口,例如,对于 groupBy "A" 5 秒,对于 groupBy "B" 10 秒......

KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.groupBy((key, msg) -> msg.getPool())
      .aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
      ...
想到

的最简单方法是在.groupBy()/.aggregate()之前.filter().branch(),例如:

KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.filter((key, msg) -> msg.getPool().equals("A"))
      .groupBy((key, msg) -> msg.getPool())
      .aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
      ...

相关内容

  • 没有找到相关文章

最新更新