Properties streamsConfiguration = this.buildKafkaProperties();
LOGGER.info("kafka properties for streaming is ::{}", streamsConfiguration);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, LocationChangeEvent> kStream = builder.stream(this.kafkaConfigProperties.getTopicName(), Consumed.with(Serdes.String(), locationChangeEventSerde));
KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
grouped.windowedBy(TimeWindows.of(Long.parseLong(String.valueOf(Duration.ofMinutes(2)))));
说明:我想从kafka流中删除重复的密钥。我有KafkaStreams<String,LocationChangeEvent> kstreams
。。。
示例-假设我在kafkaStreams 中得到这些事件
{id="1",event1},
{id="2",event2},
{id="3",event3},
{id="1",event3},
{id="2",event3}
现在,我想对它们进行分组,以便在给定的时间范围内不存在重复的密钥(id(。输出kafkaStream
:
{id="1",event1},
{id="2",event2},
{id="3",event3}
从CCD_ 3中移除重复密钥。尝试使用Kstreams.groupByKey()
,但它不适用于我的情况。我不想计算唯一的钥匙。我希望我的Kstream
只包含唯一的密钥和相应的事件。
您可以将aggragete
与suppress
一起使用。示例代码如下:
KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
grouped.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.aggregate(null, (key, value, agg) -> Optional.ofNullable(agg).orElse(value))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((windowedKey, value) -> new KeyValue<>(windowedKey.key(), value));
有关抑制的更多详细信息,您可以在这里找到