KStreamWindowagGregate似乎共享流式时期,导致Windows到期



由于窗口过期而丢弃的消息,即使对于该特定键,窗口不应关闭

我想根据事件时间将从单个分区主题和窗口窗口分组为30秒。为了避免立即处理,我调用抑制方法并使用.grace方法。窗口关闭后(30秒后 宽限期为0(,我希望将最终结果添加到一个主题中。我从主题中消耗的消息有两个不同的键:300483976和3004853339。我消耗的消息将事件时间增加10秒。我读到,仅根据增加事件时间的新消息而增加了流时间。这也是我经历的。但是我看到的问题是以下内容:

我消耗了密钥300483976的前10条消息。基于" kstreamwindowaggregate.process"方法,我注意到InternalProcessorContext.StreamTime((确实每次都会根据最新的消费消息增加。处理10条消息后,最终事件时间为Start Time 300秒。在那一刻之后,消耗了密钥300485339的消息。所有这些,但最新消息被标记为已过期并带有"跳过过期窗口的记录"的消息。似乎InternalProcessorContext.StreamTime((仍然记得第一行的最新值,因此用密钥300485339丢弃了消息。

stream
                .groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
                .windowedBy(
                        TimeWindows.of(Duration.ofSeconds(30))
                                .grace(Duration.ofMillis(0))) // override the default of 24 hours
                .aggregate(Data::new, transform(), materialize())
                .filter((key, value) -> {
                    log.info("agg {} {}", key, value.toString());
                    return true;
                })
                .suppress(
                        Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream();

我希望,随着消息由密钥(300483976和300485339(分组,流时间不会"共享"。我希望钥匙300483976和密钥3004853339。我正在使用Kafka-streams 2.1.0和一个从消息中的字段获取事件时间的时间Stampextractor。

update

我进行了一些其他测试,并改编了一个不使用聚合的示例,但在流时间中确实显示了相同的问题:

    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        final StreamsBuilder builder = new StreamsBuilder();
        final KTable<Windowed<String>, Long> valueCounts = builder
                .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
                .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
                .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
        valueCounts
                .suppress(untilWindowCloses(unbounded()))
                .toStream()
                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
                .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        valueCounts
                .toStream()
                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
                .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final ConsumerRecordFactory<String, String> recordFactory =
                new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
            driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
            // note this last records sets the streamtime to 7L causing the next messages to be discarded
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
        }
    }

在上面的示例中,第二个消息将流时间设置为7L,即使消息具有不同的键,也可以关闭创建的0到2的窗口。这会导致接下来的几条消息被丢弃,即使键是K1。因此,从这个示例中可以明显看出密钥未考虑。如果这实际上是设计的方式,我想知道这是什么情况。尤其是当我认为一个主题具有带有不同分区的消息和一个主管可能与其他组件的流(起源于eventtime(的消息完全不同时,很普遍。希望您能为此提供一些启示?

观察到的行为是通过设计。显然, stream - 时间都在所有消息中跟踪(不是 sub stream time(。

您看到的"问题"是,您的输入数据是秩序的(仅放置键和TS(:

(k1, 1), (k1, 2), (k1, 3), (k2, 1), (k2, 2), (k3, 3)

时间不会单调地增加,即用密钥k2的记录相对于带有键k1的记录的记录。因为您将宽限期设置为零,所以您告诉Kafka流不允许使用无序数据(或实际上仅在窗口内的某些零件外数据(。因此,结果只会如您所期望的那样,对于带有交错键的有序数据流但单调增加时间戳(:

(k1, 1), (k2, 1), (k1, 2), (k2, 2), (k1, 3), (k3, 3)

如果您有端外数据,则应相应地将宽限期设置为高(零仅适用于有序数据流(。

相关内容

  • 没有找到相关文章

最新更新