Kafka Streams:我们是否应该提前每个键的流时间来测试窗口化抑制



我从这个博客和本教程中了解到,为了用事件时间语义测试抑制,应该发送伪记录来提前流时间。我试着通过这样做来提前时间。但是,除非为某个特定的密钥提前时间,否则这似乎不起作用。

我有一个自定义CCD_ 1;流时间";与记录。我的流拓扑伪代码如下(我使用Kafka Streams DSL API(:

source.mapValues(someProcessingLambda)
.flatMap(flattenRecordsLambda)
.groupByKey(Grouped.with(Serdes.ByteArray(), Serdes.ByteArray()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ZERO))
.aggregate(()->null, aggregationLambda)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

我的输入格式如下:

1 - {"stream_time":"2019-04-09T11:08:36.000-04:00", id:"1", data:"..."}
2 - {"stream_time":"2019-04-09T11:09:36.000-04:00", id:"1", data:"..."}
3 - {"stream_time":"2019-04-09T11:18:36.000-04:00", id:"2", data:"..."}
4 - {"stream_time":"2019-04-09T11:19:36.000-04:00", id:"2", data:"..."}
.
.

现在记录12属于10分钟窗口,而stream_time34属于另一个。在该窗口中,记录将按照id进行聚合。我预计记录3会发出流已经前进的信号,并导致抑制发射对应于第一个窗口的数据。然而,直到我发送一个带有id:1的伪记录来提前该密钥的流时间,数据才会被发送。

我是否错误地理解了测试说明?这是预期的行为吗?伪记录的密钥重要吗?

很抱歉给您带来麻烦。这确实是一个棘手的问题。我有一些想法可以添加一些操作来支持这种集成测试,但很难不破坏基本的流处理时间语义。

这听起来像是在测试一个"真正的"KafkaStreams应用程序,而不是使用TopologyTestDriver进行测试。我的第一个建议是,如果TopologyTestDriver满足您的需求,那么您将有更好的时间使用它来验证您的应用程序语义。

在我看来,您的输入主题(以及应用程序(中可能有多个分区。如果键1转到一个分区,而键3转到另一个分区的话,您会看到您所观察到的内容。应用程序的每个分区都独立地跟踪流时间。TopologyTestDriver工作得很好,因为它只使用一个分区,而且它同步处理数据。否则,您将不得不制作"伪"时间提前消息,以便将其与您试图清除的密钥放在同一分区。

这将特别棘手,因为您的"flatMap((.groupByKey(("将重新分配数据。您必须制作伪消息,以便在重新分区后将其放入正确的分区。或者,您可以尝试将虚拟消息直接写入重新分区主题。

如果您确实需要使用KafkaStreams而不是TopologyTestDriver进行测试,我想最简单的事情就是按照您在问题中的建议,为每个密钥编写一条"时间推进"消息。不是因为这是绝对必要的,而是因为这是满足所有这些注意事项的最简单方法。我还要提到的是,我们正在对Kafka Streams中的流时间处理进行一些一般性改进,这应该会大大简化这种情况,但当然,这对您现在没有帮助。

最新更新