所以我在下面的主题中有一堆记录。我可以在KSQLDB中创建GroupBy,没有任何问题,因为它比其他任何东西都更像SQL。但我的任务是把它转移到Java KStreams上,结果失败得很惨。
有人能在拓扑结构上指导我首先按user_id、Object_id和day进行分组吗?我不会轻率地问这个问题,因为我已经在州立商店里尝试了很多例子,但我只是在追逐我的尾巴。基本上,我想知道用户在某一天看了特定对象多少次。
任何关于如何实现这一目标的内容都将不胜感激。
{
"entrytimestamp": "2020-05-04T15:21:01.897",
"user_id": "080db36a-f205-4e32-a324-cc375b75d167",
"object_id": "fdb084f7-5367-4776-a5ae-a10d6e898d22"
}
您可以创建组合键,然后按键分组,如:
KStream<String, Message> stream = builder.stream(MESSAGES, Consumed.with(Serdes.String(), jsonSerde));
KStream<String, Message> newKeyStream = stream.selectKey((key, message) ->
String.format("%s-%s-%s",
message.userId(),
message.objectId(),
LocalDate.ofInstant(Instant.ofEpochMilli(message.timestamp()), ZoneId.systemDefault())));
KGroupedStream<String, Message> groupedBy = newKeyStream.groupByKey();