KTable-KTable FK 连接:无法选择外键 serde



摘要

我正试图进行KTable-KTable外键联接,但我遇到了一个错误,因为Kafka Streams尝试将字符串序列号用于外键。

我希望它使用Kotlinx序列化序列号。如何指定?

详细信息

我想使用FK选择器将两个KTable的数据连接在一起,并将值重新映射到聚合对象。

tilesGroupedByChunk
.join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
tilePrototypesTable, // join the prototypes KTable
{ cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
{ chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
ChunkTilesAndProtos(chunkTiles, protos) // remap value 
},
namedAs("joining-chunks-tiles-prototypes"),
materializedAs(
"joined-chunked-tiles-with-prototypes",
// `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
// see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
jsonMapper.serde(),
jsonMapper.serde(),
),
)

但是,我得到了一个错误,因为Kafka Streams正在使用Serdes.String()(我的默认Serde(用于反序列化外键。但它是一个JSON对象,我希望它使用Kotlinx序列化。

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in 
StreamConfig or provide correct Serdes via method 
parameters. Make sure the Processor can accept the 
deserialized input of type key: myproject.MyTopology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause 
of error, the cast exception might have another cause 
(in user code, for example). For example, if a 
processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised 
during processing, but the cause would not be wrong Serdes.

背景

我正在处理的数据来自一个电脑游戏。游戏有一个地图,叫做曲面。每个曲面由曲面索引唯一标识。每个曲面在x/y平面上都有瓷砖。这个瓦片有一个"原型名称",即TilePrototype的ID。每个TilePrototype都有有关该磁贴的作用或外观的信息。我需要它的颜色。

拓扑

按区块对瓦片进行分组

首先,我将瓦片分组为32x32的块,然后将它们分组为KTable。

/** Each chunk is identified by the surface, and an x/y coordinate */
@Serializable
data class MapChunkDataPosition(
val position: MapChunkPosition,
val surfaceIndex: SurfaceIndex,
)
/** Each chunk has 32 tiles */
@Serializable
data class MapChunkData(
val chunkPosition: MapChunkDataPosition,
val tiles: Set<MapTile>,
)
// get all incoming tiles and group them by chunk,
// this works successfully
val tilesGroupedByChunk: KTable<MapChunkDataPosition, MapChunkData> =
buildChunkedTilesTable(tilesTable)
通过表面指数对原型进行分组

然后,我通过表面索引收集所有原型,并将它们聚合到一个列表中


/** Identifier for a surface (a simple wrapper, so I can use a Kotlinx Serialization serde everywhere)*/
@Serializable
data class SurfaceIndex(
val surfaceIndex: Int
)
/** Each surface has some 'prototypes' - I want this because each tile has a colour */
@Serializable
data class SurfacePrototypesData(
val surfaceIndex: SurfaceIndex,
val mapTilePrototypes: Set<MapTilePrototype>,
)
// get all incoming prototypes and group them by surface index,
// this works successfully
val tilePrototypesTable: KTable<SurfaceIndex, SurfacePrototypesData> =
tilePrototypesTable()
k表k表fk联接

这是导致错误的代码

/** For each chunk, get all tiles in that chunk, and all prototypes */
@Serializable
data class ChunkTilesAndProtos(
val chunkTiles: MapChunkData,
val protos: SurfacePrototypesData
)
tilesGroupedByChunk
.join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
tilePrototypesTable, // join the prototypes
{ cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
{ chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
ChunkTilesAndProtos(chunkTiles, protos) // remap value 
},
namedAs("joining-chunks-tiles-prototypes"),
materializedAs(
"joined-chunked-tiles-with-prototypes",
// `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
// see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
jsonMapper.serde(),
jsonMapper.serde(),
),
)

全堆栈跟踪

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: MyProject.processor.Topology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:131)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:105)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:186)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:54)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:29)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:182)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:179)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:87)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:136)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flushCache(CachingKeyValueStore.java:345)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:487)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1043)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1016)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1017)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.lang.ClassCastException: class MyProjectTopology$MapChunkData cannot be cast to class java.lang.String (MyProject.processor.MyProject$MapChunkData is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:99)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:69)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
... 30 common frames omitted

版本

  • Kotlin 1.6.10
  • Kafka Streams 3.0.0
  • Kotlinx系列1.3.2

不出所料,我在拓扑定义中犯了一个错误。

在创建其中一个表的最后阶段,我映射了值,但没有指定serdes。

.mapValues { _, v ->
ChunkTilesAndProtos(v.tiles, v.protos)
}

所以我把它改为指定serdes。

.mapValues(
"finalise-web-map-tile-chunk-aggregation",
materializedAs("web-map-tile-chunks", jsonMapper.serde(), jsonMapper.serde())
) { _, v ->
ChunkTilesAndProtos(v.tiles, v.protos)
}
// note: this uses extension functions from github.com/adamko-dev/kotka-streams 

找到这个并不容易。我在AbstractStream.java的构造函数(以及其他构造函数(中放置了一个断点,以查看keySerdevalueSerde字段何时设置为而不是

有时需要一个空序列号(我认为一些KTables/KStream是"虚拟的",不会对Kafka主题进行编码/解码(。然而,我能够找到导致我的问题的操作,并定义serdes,因为我正在更改值类型。

最新更新