我是Kafka流的新手,对如何设计拓扑来实现这一需求感到困惑:
我传入的流媒体记录有密钥。每个键正好出现两次。第一次,数据如下:<key> : {"sender": <Name>}
第二次值为空:<key> : null
表示信号被取消。
例如,以下是传入流的示例:
"key_1" : {"sender":"Bob"}
"key_2" : {"sender":"Alice"}
"key_2" : null
"key_3" : {"sender":"Bob"}
"key_3" : null
"key_1" : null
我需要编写一个Kafka流应用程序来计算每个发送器有多少信号。例如,对于上面的传入流,我的应用程序应该输出:
Bob : 1
Alice : 1
Alice : 0
Bob : 2
Bob : 1
Bob : 0
实现这一点的拓扑应该是什么样的?我觉得可能需要加入,但我们不知道信号持续多久(同一个键的两个记录之间的时间(。
刚刚发现。这是KTable聚合的一个基本情况:
inputStream
.toTable()
.groupBy((key, event) -> KeyValue.pair(getSender(event), 1L),
Grouped.with(Serdes.String(), Serdes.Long()).withName("group-senderId")
)
.count();