假设我有事件流。
r1- {" abc":" value 1"}
r2- {" abc":" value 2"}
r3- {" abc":" value 3"}
r4- {" abc":" value 4"}
在一个分区中。我希望从上面流的派生事件流具有
之类的事件{" abc":[" value 1"," value 2"," value 3"," value 4"]}
给定每个记录主题中已经可以使用相同的密钥。
在kafka stream api中使用聚合和groupbykey我该怎么做?
这是JSON事件流的示例,您可以尝试以下内容:
KTable<Windowed<String>, JsonNode> timeWindowedAggregatedStream = stream.groupByKey().windowedBy(Duration.ofMinutes(5))
.aggregate(
() -> objectMapper::createObjectNode, /* initializer */
(aggKey, newValue, aggValue) -> {
final JsonNode element = value.has(fieldName) && value.get(fieldName) != null ? value.get(fieldName) : null;
final ArrayNode arrayNode = aggregate == null || aggregate.get(fieldName) != null
? (ArrayNode) aggregate.get(fieldName)
: mapper.createArrayNode();
arrayNode.add(element);
// TO remove duplicates
Stream<Object> elementStream = IntStream.range(0, arrayNode.size()).mapToObj(arrayNode::get);
Set<Object> arrayAsSet = elementStream.collect(Collectors.toSet());
ObjectNode aggregateNode = mapper.createObjectNode();
ArrayNode uniqueArrayNode = mapper.valueToTree(arrayAsSet);
aggregate.set(fieldName, uniqueArrayNode);
return aggregate;
} , /* adder */
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
.withValueSerde(jsonNodeSerde)); /* serde for aggregate value */