我想创建一个case class TimedGenericRecord(record: GenericRecord, timestamp: Long)
StoreBuilder[KeyValueStore[String, TimedGenericRecord]]
存储。
因此,我需要创建一个Serde[TimedGenericRecord]
.
对于GenericRecord
,卡夫卡已经提供了一个Serde,Long
也是如此。 有没有办法为案例类创建一个 Serde 并使用这些提供的 Serdes? 因为在当前设置中,您似乎只能反序列化一个完整的byte[]
,这不允许您重用提供的 Serdes。
取自Confluent Slack频道的答案:
在您的情况下,您知道
long
被序列化为 8 个字节 - 因此, 将inputBytes[0]
复制到inputBytes[inputBytes.lenght - 8]
您提供给AvroDeserializer
的新字节数组并复制 最后 8 个字节并给LongDeserializer
.在序列化路径上, 你序列化 avro 和 long,然后将两者连接成一个字节 您返回的数组。