我有一个数据作业来读取一堆json文件,其中一些文件中的一些json行可能已损坏(无效json(。以下是代码:
df = spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.json("hdfs://someLocation/")
发生在我身上的事情是,如果我试图用上面的代码读取一个完全完美的文件(没有损坏的记录(,则根本不会添加此列。
我在这里的要求是添加这个"_corrupt_ record";列,而不管json文件是否有损坏的记录。如果文件没有任何损坏的记录,则该字段的所有值都应为null。
您只需检查_corrupt_record
列是否存在于df
中,如果不存在,则手动添加。
import pyspark.sql.functions as F
if '_corrupt_record' not in df.columns:
df = df.withColumn('_corrupt_record', F.lit(None))