当加入两个从 TimeWindows 派生的 KTable 时,时间窗口是否相同



>我对两个不同的KTable使用两种不同的保留时间,它适用于RocksDB State和changelog Kafka Topics。

KTable由KStream和groupBy生成,然后windowedBy

我相信当KStream与窗口连接时,TimeWindows是一样的。我想知道如果TimeWindows参数不同,在加入两个不同的 KTable 窗口时会有好处还是缺点TimeWindows

代码片段:

final KStream<Integer, String> eventStream = builder.stream("events",
                        Consumed.with(Serdes.Integer(), Serdes.String())
                                .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> eventWindowTable = eventStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).until(Duration.ofSeconds(100).toMillis()))
                .reduce((oldValue, newValue) -> newValue);
final KStream<Integer, String> clickStream = builder.stream("clicks",
                Consumed.with(Serdes.Integer(), Serdes.String())
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> clickWindowTable = clickStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).until(Duration.ofSeconds(70).toMillis()))
                .reduce((oldValue, newValue) -> newValue);
final KTable<Windowed<Integer>, String> join = eventWindowTable.leftJoin(clickWindowTable,
                (event, click) -> event + " ; " + click + " ; " + Instant.now()
        );

最初,我认为使用不同的TimeWindows参数连接两个不同的 KTable 将不起作用,因为连接依赖于时隙的键 TimeWindowedKey。但经过测试,它也可以工作。

执行连接是因为两个键的类型相同:Windowed<Integer> 。当然,只有在键相同时,联接才会产生结果。假设您有以下窗口(请注意,仅存储窗口开始时间戳TimeWindows(:

eventWindowTable: <A,0>        <A,60>       
clickWindowTable: <A,0> <A,30> <A,60> <A,90>

在这种情况下,只有<A,0><A,60>会加入。因此,具有不同的窗口确实会影响您的结果,因为窗口开始时间戳是键的一部分,并且某些窗口永远不会加入(例如,在我们的示例中<A,30><A,90>(。

最新更新