我有一个巨大的嵌套json,如下
"evaluation_parameters": {},
"meta": {
"active_batch_definition": {
"batch_identifiers": {
"pipeline_stage": "prod",
"run_id": "run_20220224"
},
"data_asset_name": "STORES_DQ_SUITE",
"data_connector_name": "stores_connector",
"datasource_name": "stores"
},
"batch_markers": {
"ge_load_time": "20220224T054318.272571Z"
},
"batch_spec": {
"batch_data": "SparkDataFrame",
"data_asset_name": "STORES_DQ_SUITE"
},
"expectation_suite_name": "STORES_DQ_SUITE",
"great_expectations_version": "0.14.7",
"run_id": {
"run_name": "stores_template_20220224-054316",
"run_time": "2022-02-24T05:43:16.678220+00:00"
},
"validation_time": "20220224T054318.389119Z"
},
"results": [
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_to_exist",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country"
},
"meta": {}
},
"meta": {},
"result": {},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country"
},
"meta": {}
},
"meta": {},
"result": {
"element_count": 102,
"partial_unexpected_counts": [],
"partial_unexpected_index_list": null,
"partial_unexpected_list": [],
"unexpected_count": 0,
"unexpected_percent": 0.0
},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country",
"type_": "StringType"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": "StringType"
},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_to_exist",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "countray"
},
"meta": {}
},
"meta": {},
"result": {},
"success": false
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_table_row_count_to_equal",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"value": 10
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 102
},
"success": false
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_sum_to_be_between",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "active_stores",
"max_value": 1000,
"min_value": 100
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 22075.0
},
"success": false
}
],
"statistics": {
"evaluated_expectations": 6,
"success_percent": 50.0,
"successful_expectations": 3,
"unsuccessful_expectations": 3
},
"success": false
}
我想导出一个具有以下沿袭值的表-
data_source:硬编码值
run_time:meta.run_id.run_time
expectation_type:results.expectation_config.expectation_type
expects:results.expectation_config.kargs(字典中除batch_id以外的所有值(
results:results.result(所有内容都像字典一样(
预期结果
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|data_source |run_time |expectation_type |expectations |results |success |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist |{"column": "country"} |{} |true |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_not_be_null |{"column": "country"} |{"element_count": 102, "partial_unexpected_counts": [], "partial_unexpected_index_list": null, "partial_unexpected_list": [], "unexpected_count": 0, "unexpected_percent": 0.0} |true |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_be_of_type |{"column": "country","type_": "StringType"} |{"observed_value": "StringType"} |true |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist |{"column": "countray"} |{} |false |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_table_row_count_to_equal |{"value": 10} |{"observed_value": 102} |false |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_sum_to_be_between |{"column": "active_stores","max_value": 1000,"min_value": 100} |{"observed_value": 22075.0} |false |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
有人能帮我做这个吗。
提前谢谢。
使用spark.read.json
函数将json转换为数据帧。
之后,它将为您提供带有父键的df作为单独的列。之后,您需要使用spark.sql.functions
的explode
函数分解结果列。有关更多详细信息,请阅读此
然后只需从分解列中选择所需的字段。
from pyspark.sql.functions import explode
df = spark.read.json(json_path)
df = df.select(df.meta.run_id.run_time, df.results)
df = df.withColumn("exploded_results", explode(df.results))
df = df.select(df.meta.run_id.run_time, df.exploded_results.expectation_config.expectation_type, df.exploded_results.expectation_config.kwargs, df.exploded_results.result, df.exploded_results.success)