当一个主题有多个主题时,如何处理Kafka(使用Apache Beam)的Avro输入



为了使用KafkaIO使用Apache Beam处理Avro编码的消息,需要传递一个ConfluentSchemaRegistryDeserializerProvider实例作为值反序列化器。

一个典型的例子如下:

PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
.apply(KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("kafka-broker:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://my-local-schema-registry:8081", "my_subject"))

然而,我想消费的一些卡夫卡主题有多个不同的主题(事件类型((出于排序原因(。因此,我无法提前提供一个固定的主题名称。如何解决这一困境?

(我的目标是,最终使用BigQueryIO将这些事件推送到云中。(

你可以进行多次阅读,每个主题一次,然后将它们压平。

最新更新