我正试图构建一个流,该流获取一个Avro主题,进行简单的转换,然后以Avro格式将其再次发送回另一个主题,我有点拘泥于最后的序列化部分。
我已经创建了一个AVRO模式,我正在导入它并使用它来创建特定的AVRO Serde。但我不知道如何使用这个serde将电影对象序列化回AVRO。
这是流类:
class StreamsProcessor(val brokers: String, val schemaRegistryUrl: String) {
private val logger = LogManager.getLogger(javaClass)
fun process() {
val streamsBuilder = StreamsBuilder()
val avroSerde = GenericAvroSerde().apply {
configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
}
val movieAvro = SpecificAvroSerde<Movie>().apply{
configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
}
val movieAvroStream: KStream<String, GenericRecord> = streamsBuilder
.stream(movieAvroTopic, Consumed.with(Serdes.String(), avroSerde))
val movieStream: KStream<String, StreamMovie> = movieAvroStream.map {_, movieAvro ->
val movie = StreamMovie(
movieId = movieAvro["name"].toString() + movieAvro["year"].toString(),
director = movieAvro["director"].toString(),
)
KeyValue("${movie.movieId}", movie)
}
// This where I'm stuck, the call is wrong because movieStream is not a <String, movieAvro> object
movieStream.to(movieTopic, Produced.with(Serdes.String(), movieAvro))
val topology = streamsBuilder.build()
val props = Properties()
props["bootstrap.servers"] = brokers
props["application.id"] = "movies-stream"
val streams = KafkaStreams(topology, props)
streams.start()
}
}
感谢
结果流的类型为KStream<String, StreamMovie>
,因此使用的值Serde的类型应为SpecificAvroSerde<StreamMovie>
。
为什么要尝试使用SpecificAvroSerde<Movie>
?如果Movie
是所需的输出类型,则应在map
步骤中创建Movie
对象,而不是StreamMovie
对象,并相应地更改结果KStream
的值类型。
比较https://github.com/confluentinc/kafka-streams-examples/blob/5.4.1-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java