为嵌套Json创建Spark结构化流模式



我想为我的结构化流作业(在python中(定义模式,但我无法以我想要的方式获得数据帧模式。

对于这个json

{
"messages": [{
"IdentityNumber": 1,
"body": {
"Alert": "This is the payload"
},
"regionNumber": 11000002
}]
}

我正在使用以下代码作为模式

schema1 = StructType([StructField("messages", ArrayType(   
StructType( 
[
StructField("body", StructType( [StructField("Alert", StringType())]) )
]
)
,True))])

但我的模式是

df->消息->身体->警报

虽然我想要这样的

df->警报

即具有单列名为alert的数据帧,该数据帧将包含作为alert出现的所有字符串消息。我应该对我定义的模式进行什么更改?

如果您正在读取有关该模式的数据,则该模式是可以的。

如果在阅读上述模式中的json后需要提取嵌套字段,只需使用点表示法即可。例如:

df.select(col("messages[0].body.alert"))

如果您需要操作和分解所有数组元素,请查看本文,该文章解释了您必须执行的不同选项:https://docs.databricks.com/_static/notebooks/transform-complex-data-types-scala.html

与本文一样,上面的答案在scala中,但大多数spark-sqlAPI都可以很容易地转移到pySpark。

最新更新