Kafka流的交互式条件



我想为我的kafka流创建互动条件。我只想知道这是可能的。

示例用例是:

我有单击" Kafka主题"流中的用户点击事件。用户在单击选项表中为自己定义最小点击计数,我想在他们达到最低点击计数时通知它们。KStream过滤器点击计数以限制。EventListener消费由KStream输出产生的主题数据并将通知发送给用户。

如何根据用户的持续数据来定义用户的kStream滤波器条件?当持久数据更改时,我可以更改它吗?

您需要创建两个主题:

  • user-prefs-具有用户首选项,其中键是用户ID,值是最小点击数。(良好的做法是使其压实)
  • clicks-发送原始点击的主题,键是用户ID,值不重要(假设某些字符串)

使用kafkaproduducer,您将用户偏好(点击数量最少)发送到user-prefs,如果它们更改,则需要发送新消息用户点击转到clicks主题。

假设您想将它们汇总一段时间(60秒)。

首先,您必须分组和汇总点击,然后发送最终结果。之后,您将使用user-prefs加入最终结果,其中保留了最小的点击次数。过滤器是根据汇总点击数量和最小点击次数制成的

KStream<String, Long> clicks = builder.<String, String>stream("clicks")
    .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(1)))
    .count(Materialized.with(Serdes.String(), Serdes.Long()))
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream().map((key, value) -> new KeyValue<>(key.key(), value));
KTable<String, Long> userPrefs = builder.<String, Long>table(
    "user-prefs",
    Consumed.with(Serdes.String(), Serdes.Long())
);
clicks.join(
    userPrefs,
    (userClicks, minUserClicksNumber) -> userClicks >= minUserClicksNumber,
    Joined.with(Serdes.String(), Serdes.Long(), Serdes.Long())
)
    .filter((userName, isSufficientNumberOfClick) -> isSufficientNumberOfClick)
    .map(((key, value) -> new KeyValue<>(key, key)))
    .to("output", Produced.with(Serdes.String(), Serdes.String()));

最新更新