Kafka JSON的结构化流模式-查询错误



我使用Spark 3.2从Kafka 2.12-3.0.0获取JSON流。解析JSON后,我在查询中收到错误。

Kafka主题流JSONs:

b'{"pmu_id": 2, "time": 1642771653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 49.99, "rocof": 1}'
b'{"pmu_id": 2, "time": 1642734653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 50.00, "rocof": -1}'

DataFrame架构:

stream01Schema= StructType()
.add("pmu_id", ByteType())
.add("time", TimestampType()).add("stream_id", ByteType())
.add("analog", StringType()).add("digital", ByteType()).add("frequency", FloatType()).add("rocof", ByteType())

构建从主题读取的流式DataFrame:

stream01DF = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) 
.option("subscribe", kafka_topic_name) 
.option("startingOffsets", "latest") 
.load()
.select(col("key").cast("string") from_json(col("value").cast("string").alias("pmudata"), stream01Schema))

打印结果模式:

root
|-- key: string (nullable = true)
|-- from_json(CAST(value AS STRING) AS pmudata): struct (nullable = true)    
|    |-- pmu_id: byte (nullable = true)
|    |-- time: timestamp (nullable = true)
|    |-- stream_id: byte (nullable = true)
|    |-- analog: string (nullable = true)
|    |-- digital: byte (nullable = true)
|    |-- frequency: float (nullable = true)
|    |-- rocof: byte (nullable = true)

测试查询:

testQuery = stream01DF.groupBy("pmudata.rocof").count()    
testQuery.writeStream 
.outputMode("complete") 
.format("console") 
.option("truncate", False) 
.start() 
.awaitTermination()

收到错误:

pyspark.sql.utils.AnalysisException: cannot resolve 'pmudata.rocof' given input columns: [from_json(CAST(value AS STRING) AS pmudata), key];

您似乎在寻找这个,因为您正试图将from_json()列(请检查括号(别名为一个名称,您稍后可以根据该名称进行选择/分组。

from_json(col("value").cast("string"), stream01Schema).alias("pmudata")

完整的用法在这个Databrickspost中的端到端示例中

最新更新