在允许模式下读取json文件-PySpark 2.3



我有一个数据作业来读取一堆json文件,其中一些文件中的一些json行可能已损坏(无效json(。以下是代码:

df = spark.read 
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json("hdfs://someLocation/")

发生在我身上的事情是,如果我试图用上面的代码读取一个完全完美的文件(没有损坏的记录(,则根本不会添加此列。

我在这里的要求是添加这个"_corrupt_ record";列,而不管json文件是否有损坏的记录。如果文件没有任何损坏的记录,则该字段的所有值都应为null。

您只需检查_corrupt_record列是否存在于df中,如果不存在,则手动添加。

import pyspark.sql.functions as F
if '_corrupt_record' not in df.columns:
df = df.withColumn('_corrupt_record', F.lit(None))

最新更新