如何使用窗口将时间t从单个分区分组到单个记录,每个记录都具有相同的键



假设我有事件流。

r1- {" abc":" value 1"}

r2- {" abc":" value 2"}

r3- {" abc":" value 3"}

r4- {" abc":" value 4"}

在一个分区中。我希望从上面流的派生事件流具有

之类的事件

{" abc":[" value 1"," value 2"," value 3"," value 4"]}

给定每个记录主题中已经可以使用相同的密钥。

在kafka stream api中使用聚合和groupbykey我该怎么做?

这是JSON事件流的示例,您可以尝试以下内容:

KTable<Windowed<String>, JsonNode> timeWindowedAggregatedStream = stream.groupByKey().windowedBy(Duration.ofMinutes(5))
    .aggregate(
        () -> objectMapper::createObjectNode, /* initializer */
        (aggKey, newValue, aggValue) -> {
            final JsonNode element = value.has(fieldName) && value.get(fieldName) != null ? value.get(fieldName) : null;
        final ArrayNode arrayNode = aggregate == null || aggregate.get(fieldName) != null
                ? (ArrayNode) aggregate.get(fieldName)
                : mapper.createArrayNode();
        arrayNode.add(element);
        // TO remove duplicates
        Stream<Object> elementStream = IntStream.range(0, arrayNode.size()).mapToObj(arrayNode::get);
        Set<Object> arrayAsSet = elementStream.collect(Collectors.toSet());
        ObjectNode aggregateNode = mapper.createObjectNode();
        ArrayNode uniqueArrayNode = mapper.valueToTree(arrayAsSet);
        aggregate.set(fieldName, uniqueArrayNode); 
        return aggregate;
} , /* adder */
        Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(jsonNodeSerde)); /* serde for aggregate value */

相关内容

  • 没有找到相关文章

最新更新