为什么此 KStream/KTable 拓扑会传播未通过筛选器的记录?



我有以下拓扑:

  1. 创建状态存储
  2. 根据SOME_CONDITION筛选记录,将其值映射到新实体,最后将这些记录发布到另一个主题STATIONS_LOW_CAPACITY_TOPIC

但是我在STATIONS_LOW_CAPACITY_TOPIC上看到了这一点:

�   null
�   null
�   null
�   {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
�   {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
�   null

也就是说,就好像它也将那些未通过过滤器的记录发布到STATIONS_LOW_CAPACITY_TOPIC主题。这怎么可能?如何防止它们被发布?

这是 ksteams 代码:

kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
Stats(XXX)
}
.toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

更新:我已经简单地拓扑并打印了生成的表。出于某种原因,最终的 KTable 还包含与未通过筛选器的上游记录对应的空值记录:

kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value ->
val conditionResult = (SOME_CONDITION)
println(conditionResult)
conditionResult
}
.print()

原木:

false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)

答案在KTable.filter(...)的 javadoc 中:

请注意,更改日志流的过滤器与记录的工作方式不同 流过滤器,因为具有空值的记录(所谓的逻辑删除 记录(具有删除语义。因此,对于墓碑,提供的 不计算筛选器谓词,但逻辑删除记录为 如果需要,直接转发(即,如果有任何需要转发的内容 已删除(。此外,对于删除的每条记录(即点 不满足给定谓词(转发逻辑删除记录。

这就解释了为什么我看到向下游发送的空值(逻辑删除(记录。

为了避免这种情况,我将KTable转换为KStream,然后应用过滤器:

kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.toStream()
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
StationStats(station.id, station.latitude, station.longitude, ...)
}
.to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

结果:

4   {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5   {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...

最新更新