Kafka流计算属性值出现的次数



我是Kafka流的新手,对如何设计拓扑来实现这一需求感到困惑:

我传入的流媒体记录有密钥。每个键正好出现两次。第一次,数据如下:
<key> : {"sender": <Name>}
第二次值为空:
<key> : null
表示信号被取消。

例如,以下是传入流的示例:

"key_1" : {"sender":"Bob"}
"key_2" : {"sender":"Alice"}
"key_2" : null
"key_3" : {"sender":"Bob"}
"key_3" : null
"key_1" : null

我需要编写一个Kafka流应用程序来计算每个发送器有多少信号。例如,对于上面的传入流,我的应用程序应该输出:

Bob : 1
Alice : 1
Alice : 0
Bob : 2
Bob : 1
Bob : 0

实现这一点的拓扑应该是什么样的?我觉得可能需要加入,但我们不知道信号持续多久(同一个键的两个记录之间的时间(。

刚刚发现。这是KTable聚合的一个基本情况:

inputStream
.toTable()
.groupBy((key, event) -> KeyValue.pair(getSender(event), 1L),
Grouped.with(Serdes.String(), Serdes.Long()).withName("group-senderId")
)
.count();

相关内容

  • 没有找到相关文章

最新更新