我想为我的kafka流创建互动条件。我只想知道这是可能的。
示例用例是:
我有单击" Kafka主题"流中的用户点击事件。用户在单击选项表中为自己定义最小点击计数,我想在他们达到最低点击计数时通知它们。KStream过滤器点击计数以限制。EventListener消费由KStream输出产生的主题数据并将通知发送给用户。
如何根据用户的持续数据来定义用户的kStream滤波器条件?当持久数据更改时,我可以更改它吗?
您需要创建两个主题:
-
user-prefs
-具有用户首选项,其中键是用户ID,值是最小点击数。(良好的做法是使其压实) -
clicks
-发送原始点击的主题,键是用户ID,值不重要(假设某些字符串)
使用kafkaproduducer,您将用户偏好(点击数量最少)发送到user-prefs
,如果它们更改,则需要发送新消息用户点击转到clicks
主题。
假设您想将它们汇总一段时间(60秒)。
首先,您必须分组和汇总点击,然后发送最终结果。之后,您将使用user-prefs
加入最终结果,其中保留了最小的点击次数。过滤器是根据汇总点击数量和最小点击次数制成的
KStream<String, Long> clicks = builder.<String, String>stream("clicks")
.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(1)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream().map((key, value) -> new KeyValue<>(key.key(), value));
KTable<String, Long> userPrefs = builder.<String, Long>table(
"user-prefs",
Consumed.with(Serdes.String(), Serdes.Long())
);
clicks.join(
userPrefs,
(userClicks, minUserClicksNumber) -> userClicks >= minUserClicksNumber,
Joined.with(Serdes.String(), Serdes.Long(), Serdes.Long())
)
.filter((userName, isSufficientNumberOfClick) -> isSufficientNumberOfClick)
.map(((key, value) -> new KeyValue<>(key, key)))
.to("output", Produced.with(Serdes.String(), Serdes.String()));