如何只处理kafkaStreams中的唯一密钥


Properties streamsConfiguration = this.buildKafkaProperties();
LOGGER.info("kafka properties for streaming is ::{}", streamsConfiguration);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, LocationChangeEvent> kStream = builder.stream(this.kafkaConfigProperties.getTopicName(), Consumed.with(Serdes.String(), locationChangeEventSerde));
KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
grouped.windowedBy(TimeWindows.of(Long.parseLong(String.valueOf(Duration.ofMinutes(2)))));

说明:我想从kafka流中删除重复的密钥。我有KafkaStreams<String,LocationChangeEvent> kstreams。。。

示例-假设我在kafkaStreams 中得到这些事件

{id="1",event1},
{id="2",event2},
{id="3",event3},
{id="1",event3},
{id="2",event3}

现在,我想对它们进行分组,以便在给定的时间范围内不存在重复的密钥(id(。输出kafkaStream:

{id="1",event1},
{id="2",event2},
{id="3",event3}

从CCD_ 3中移除重复密钥。尝试使用Kstreams.groupByKey(),但它不适用于我的情况。我不想计算唯一的钥匙。我希望我的Kstream只包含唯一的密钥和相应的事件。

您可以将aggragetesuppress一起使用。示例代码如下:

KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
grouped.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.aggregate(null, (key, value, agg) -> Optional.ofNullable(agg).orElse(value))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((windowedKey, value) -> new KeyValue<>(windowedKey.key(), value));

有关抑制的更多详细信息,您可以在这里找到

相关内容

  • 没有找到相关文章

最新更新