聚合用例-通过两个不同的kafka流进行分组并计算平均值



我被困在一个用例中。我需要计算一下娱乐支出的百分比。

在我的流中,我得到了具有行业代码和支出金额的记录(行业代码是基于娱乐和非娱乐的(

例如>娱乐行业代码>行业代码金额

157       100
257        200
157         300

非娱乐行业代码>行业代码金额

457       100
657       200
457       300

所以我需要计算在娱乐上花费了多少%,应该是娱乐金额的总和/(娱乐金额的总额+非娱乐金额的总数(

我正在尝试的解决方案是:创建两个Ktables一个娱乐-按娱乐代码分组,即。157 400(100+300(

257    100

非娱乐另一KTable 457 400

657      200

我现在有两个KTable,但我如何计算%?这种方法正确吗?

我不完全确定你到目前为止到底尝试了什么。当您创建两个表时,两个表都有多行(即,每个代码一行(还是您已经";合并";不同的代码在一起?如果每个表包含多行,则需要通过设置一个虚构的代理项(如所有行的整数值0(将所有行聚合在一起:

KStream sumSpendingEntertainment = spendingEntertainment.groupBy((k,v) -> 0)
.aggregate(...);
KStream sumSpendingAll = spendingAll.groupBy((k,v) -> 0)
.aggregate(...);

最后,您可以得到两个具有单行的KTable;一个包含总体支出;娱乐支出;并且两个KTables将使用相同的组成代理密钥(在我们的示例中为整数0(。

在最后一步中,您可以连接两个表来划分两个总和:

sumSpendingEntertainment.join(sumSpendingAll,
(sumEnt, sumAll) -> sumEnt / sumAll); // this is the `ValueJoiner` expressed as lambda

相关内容

  • 没有找到相关文章

最新更新