在Pyspark中将嵌套json的几个字段转换为字典



我有一个巨大的嵌套json,如下

"evaluation_parameters": {},
"meta": {
"active_batch_definition": {
"batch_identifiers": {
"pipeline_stage": "prod",
"run_id": "run_20220224"
},
"data_asset_name": "STORES_DQ_SUITE",
"data_connector_name": "stores_connector",
"datasource_name": "stores"
},
"batch_markers": {
"ge_load_time": "20220224T054318.272571Z"
},
"batch_spec": {
"batch_data": "SparkDataFrame",
"data_asset_name": "STORES_DQ_SUITE"
},
"expectation_suite_name": "STORES_DQ_SUITE",
"great_expectations_version": "0.14.7",
"run_id": {
"run_name": "stores_template_20220224-054316",
"run_time": "2022-02-24T05:43:16.678220+00:00"
},
"validation_time": "20220224T054318.389119Z"
},
"results": [
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_to_exist",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country"
},
"meta": {}
},
"meta": {},
"result": {},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country"
},
"meta": {}
},
"meta": {},
"result": {
"element_count": 102,
"partial_unexpected_counts": [],
"partial_unexpected_index_list": null,
"partial_unexpected_list": [],
"unexpected_count": 0,
"unexpected_percent": 0.0
},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country",
"type_": "StringType"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": "StringType"
},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_to_exist",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "countray"
},
"meta": {}
},
"meta": {},
"result": {},
"success": false
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_table_row_count_to_equal",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"value": 10
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 102
},
"success": false
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_sum_to_be_between",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "active_stores",
"max_value": 1000,
"min_value": 100
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 22075.0
},
"success": false
}
],
"statistics": {
"evaluated_expectations": 6,
"success_percent": 50.0,
"successful_expectations": 3,
"unsuccessful_expectations": 3
},
"success": false
}

我想导出一个具有以下沿袭值的表-

data_source:硬编码值

run_time:meta.run_id.run_time

expectation_type:results.expectation_config.expectation_type

expects:results.expectation_config.kargs(字典中除batch_id以外的所有值(

results:results.result(所有内容都像字典一样(

预期结果

+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|data_source        |run_time                        |expectation_type                          |expectations                                                           |results                                                                                                                                                                            |success        |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist                    |{"column": "country"}                                                  |{}                                                                                                                                                                                 |true           |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_not_be_null       |{"column": "country"}                                                  |{"element_count": 102, "partial_unexpected_counts": [], "partial_unexpected_index_list": null, "partial_unexpected_list": [], "unexpected_count": 0, "unexpected_percent": 0.0}    |true           |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_be_of_type        |{"column": "country","type_": "StringType"}                            |{"observed_value": "StringType"}                                                                                                                                                   |true           |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist                    |{"column": "countray"}                                                 |{}                                                                                                                                                                                 |false          |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_table_row_count_to_equal           |{"value": 10}                                                          |{"observed_value": 102}                                                                                                                                                            |false          |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_sum_to_be_between           |{"column": "active_stores","max_value": 1000,"min_value": 100}         |{"observed_value": 22075.0}                                                                                                                                                        |false          |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+

有人能帮我做这个吗。

提前谢谢。

使用spark.read.json函数将json转换为数据帧。

之后,它将为您提供带有父键的df作为单独的列。之后,您需要使用spark.sql.functionsexplode函数分解结果列。有关更多详细信息,请阅读此

然后只需从分解列中选择所需的字段。

from pyspark.sql.functions import explode
df = spark.read.json(json_path)
df = df.select(df.meta.run_id.run_time, df.results)
df = df.withColumn("exploded_results", explode(df.results))
df = df.select(df.meta.run_id.run_time, df.exploded_results.expectation_config.expectation_type, df.exploded_results.expectation_config.kwargs, df.exploded_results.result, df.exploded_results.success)

最新更新