TLDR与pyspark.sql.functions.get_json_object
一起使用时,以下JSON路径对我不起作用。
$.Blocks[?(@.Type=='LINE')].Confidence
长版本
我想在单行中按阵列分组
例如,对于以下的结构
root
|--id: string
|--payload: string
payload
的值是一个字符串,表示一个json块,看起来像下面的结构
{
"Blocks": [
{
"Type": "LINE",
"Confidence": 90
},
{
"Type": "LINE",
"Confidence": 98
},
{
"Type": "WORD",
"Confidence": 99
},
{
"Type": "PAGE",
"Confidence": 97
},
{
"Type": "PAGE",
"Confidence": 89
},
{
"Type": "WORD",
"Confidence": 99
}
]
}
我想按类型聚合所有的信心,这样我们就得到了下面的新专栏。。。
{
"id": 12345,
"payload": "..."
"confidence": [
{
"Type": "WORD",
"Confidence": [
99,
99
]
},
{
"Type": "PAGE",
"Confidence": [
97,
89
]
},
{
"Type": "LINE",
"Confidence": [
90,
98
]
}
]
}
为此,我计划使用get_json_object(...)
来提取每种类型块的置信度。
例如。。。
get_json_object(col("payload"), "$.Blocks[?(@.Type=='LINE')].Confidence")
但$.Blocks[?(@.Type=='LINE')].Confidence
不断返回null
。为什么?
我通过在https://jsonpath.curiousconcept.com/#针对上面的示例payload
json,得到了以下结果。。。
[
90,
98
]
如果使用上面的路径不是一个选项,我们将如何进行聚合?
下面是完整的代码示例。我期望第一个.show()
在置信度列中打印出[90, 98]
。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType, IntegerType
from pyspark.sql.functions import get_json_object, col
def main():
spark = SparkSession.builder.appName('test_session').getOrCreate()
df = spark.createDataFrame([
(
12345, # id
"""
{
"Blocks": [
{
"Type": "LINE",
"Confidence": 90
},
{
"Type": "LINE",
"Confidence": 98
},
{
"Type": "WORD",
"Confidence": 99
},
{
"Type": "PAGE",
"Confidence": 97
},
{
"Type": "PAGE",
"Confidence": 89
},
{
"Type": "WORD",
"Confidence": 99
}
]
}
""" # payload
)
],
StructType(
[
StructField("id", IntegerType(), True),
StructField("payload", StringType(), True)
])
)
# this prints out null (why?)
df.withColumn("confidence", get_json_object(col("payload"), "$.Blocks[?(@.Type=='LINE')].Confidence")).show()
# this prints out the correct values, [90,98,99,97,89,99]
df.withColumn("confidence", get_json_object(col("payload"), "$.Blocks[*].Confidence")).show()
if __name__ == "__main__":
main()
Spark如何解析JSON路径没有官方文档,但根据其源代码,它似乎不支持@
作为当前对象。事实上,它支持非常有限的语法:
// parse `[*]` and `[123]` subscripts
// parse `.name` or `['name']` child expressions
// child wildcards: `..`, `.*` or `['*']`
因此,如果你对另一种方法持开放态度,这里是预定义的模式和函数,如from_json
、explode
、collect_list
:
schema = T.StructType([
T.StructField('Blocks', T.ArrayType(T.StructType([
T.StructField('Type', T.StringType()),
T.StructField('Confidence', T.IntegerType())
])))
])
(df
.withColumn('json', F.from_json('payload', schema))
.withColumn('block', F.explode('json.blocks'))
.select('id', 'block.*')
.groupBy('id', 'Type')
.agg(F.collect_list('Confidence').alias('confidence'))
.show(10, False)
)
# +-----+----+----------+
# |id |Type|confidence|
# +-----+----+----------+
# |12345|PAGE|[97, 89] |
# |12345|WORD|[99, 99] |
# |12345|LINE|[90, 98] |
# +-----+----+----------+
由于get_json_object((不支持@,jsonpath_ng python模块可以用来查找输入json路径的确切值。
import json
import sys,time
sys.path.insert(0,"/opt/Anaconda3/lib/python3.6/site-packages/")
from jsonpath_ng import jsonpath, parse
val='''{
"Blocks": [
{
"Type": "LINE",
"Confidence": 90
},
{
"Type": "LINE",
"Confidence": 98
},
{
"Type": "WORD",
"Confidence": 99
},
{
"Type": "PAGE",
"Confidence": 97
},
{
"Type": "PAGE",
"Confidence": 89
},
{
"Type": "WORD",
"Confidence": 99
}
]
}'''
json_data=json.loads(val)
query=[x.value for x in parser.parse("$.Blocks[?(@.Type=='LINE')].Confidence").find(json_data)]
var=json.dumps(query)
#var=var[1:-1]
print(var)
[90,98]
如果您希望输出不带[],请在上面的代码中取消注释var=var[1:-1]。