JSON 文件解析在创建 Spark 数据帧时会忽略格式错误的记录



>我正在创建一个火花数据帧,其中架构是从json记录推断出来的。但是,json 数据集的某些行比其他行具有更多的列,因此数据帧解析失败。无论如何,我可以用空值代替缺少额外列的记录。

raw_event_data_rdd = sc.textFile(INPUT_DATA_DIR)
pre_processed_raw_event_data_rdd = raw_event_data_rdd.map(lambda raw_event: json.loads(raw_event))
rdd_of_rows = pre_processed_raw_event_data_rdd.map(lambda x: Row(**x))
pre_processed_raw_event_data_df = sqlContext.createDataFrame(rdd_of_rows,samplingRatio=1,verifySchema=False)

我的模式非常复杂,因此我正在使用推断模式。如何使用 pyspark 处理此类 json 数据集并将其转换为数据帧。

错误:

:org.apache.spark.SparkException:作业由于阶段故障而中止: 阶段 2.0 中的任务 0 失败了 4 次,最近一次失败:丢失的任务 0.3 在阶段 2.0(TID 255,adpiaddn-1dd28x-24a87926.us-east-1.amazon.com, executor 49): java.lang.IllegalStateException: 输入行没有 架构所需的预期值数。8 个字段是 必需,同时提供 7 个值。

您可以使用

sqlContext.read.json将 json 直接加载到结构化数据帧中。

假设我们有样本数据在INPUT_DATA_DIR

{"a":1,"b":1}
{"a":2,"b":2,"c":2}

然后

from pyspark.sql import Row
import json
raw_event_data_rdd = sc.textFile(INPUT_DATA_DIR)
pre_processed_raw_event_data_rdd = raw_event_data_rdd.map(lambda raw_event: json.loads(raw_event))
rdd_of_rows = pre_processed_raw_event_data_rdd.map(lambda x: Row(**x))
pre_processed_raw_event_data_df = sqlContext.createDataFrame(rdd_of_rows,samplingRatio=1,verifySchema=False)
pre_processed_raw_event_data_df.show()

产生与您相同的错误:

原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数。 3 个字段为必填字段,同时提供 2 个值。

pre_processed_raw_event_data_df = sqlContext.read.json(INPUT_DATA_DIR)
pre_processed_raw_event_data_df.show()
    +---+---+----+
    |  a|  b|   c|
    +---+---+----+
    |  1|  1|null|
    |  2|  2|   2|
    +---+---+----+

相关内容

  • 没有找到相关文章

最新更新