>我正在创建一个火花数据帧,其中架构是从json记录推断出来的。但是,json 数据集的某些行比其他行具有更多的列,因此数据帧解析失败。无论如何,我可以用空值代替缺少额外列的记录。
raw_event_data_rdd = sc.textFile(INPUT_DATA_DIR)
pre_processed_raw_event_data_rdd = raw_event_data_rdd.map(lambda raw_event: json.loads(raw_event))
rdd_of_rows = pre_processed_raw_event_data_rdd.map(lambda x: Row(**x))
pre_processed_raw_event_data_df = sqlContext.createDataFrame(rdd_of_rows,samplingRatio=1,verifySchema=False)
我的模式非常复杂,因此我正在使用推断模式。如何使用 pyspark 处理此类 json 数据集并将其转换为数据帧。
错误:
:org.apache.spark.SparkException:作业由于阶段故障而中止: 阶段 2.0 中的任务 0 失败了 4 次,最近一次失败:丢失的任务 0.3 在阶段 2.0(TID 255,adpiaddn-1dd28x-24a87926.us-east-1.amazon.com, executor 49): java.lang.IllegalStateException: 输入行没有 架构所需的预期值数。8 个字段是 必需,同时提供 7 个值。
sqlContext.read.json
将 json 直接加载到结构化数据帧中。
假设我们有样本数据在INPUT_DATA_DIR
{"a":1,"b":1}
{"a":2,"b":2,"c":2}
然后
from pyspark.sql import Row
import json
raw_event_data_rdd = sc.textFile(INPUT_DATA_DIR)
pre_processed_raw_event_data_rdd = raw_event_data_rdd.map(lambda raw_event: json.loads(raw_event))
rdd_of_rows = pre_processed_raw_event_data_rdd.map(lambda x: Row(**x))
pre_processed_raw_event_data_df = sqlContext.createDataFrame(rdd_of_rows,samplingRatio=1,verifySchema=False)
pre_processed_raw_event_data_df.show()
产生与您相同的错误:
原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数。 3 个字段为必填字段,同时提供 2 个值。
但
pre_processed_raw_event_data_df = sqlContext.read.json(INPUT_DATA_DIR)
pre_processed_raw_event_data_df.show()
+---+---+----+
| a| b| c|
+---+---+----+
| 1| 1|null|
| 2| 2| 2|
+---+---+----+