以Kafka为源的结构化流中的JSON模式推理



我目前正在使用Spark Structured Steaming从Kafka主题中读取json数据。json作为字符串存储在主题中。为了实现这一点,我提供了一个硬编码的JSON模式作为StructType。我正在寻找一种在流媒体期间动态推断主题模式的好方法。

这是我的代码:(这是Kotlin,而不是通常使用的Scala(

spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "my_topic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_json(
col("value"),
JsonSchemaRegistry.mySchemaStructType)
.`as`("data")
)
.select("data.*")
.writeStream()
.format("my_format")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(ProcessingTime("25 seconds"))
.start("/my/path")
.awaitTermination()

这是否可能在不为每个DataFrame再次推断的情况下以一种干净的方式实现?我在找一些惯用的方法。如果模式推理在结构化流中是不可取的,我会继续对我的模式进行硬编码,但要确定。spark文档中提到了spark.sql.streaming.schemaConference选项,但我不知道如何使用它。

对于KAFKA是不可能的。花了太多时间。对于文件源,您可以。

来自手册:

流式数据帧/数据集的模式推理和分区

默认情况下,来自基于文件的源的结构化流式处理需要指定模式,而不是依赖Spark来推断自动地此限制可确保一致的架构用于流式查询,即使在失败的情况下也是如此。对于临时在用例中,可以通过设置spark.sql.streaming.schemaConference为true。

但对于不是KAFKA的文件源。

最新更新