我正在加载许多版本的JSON文件来spark DataFrame。有些文件包含列A和B和一些A,B,C或A,C..
如果我运行这个命令
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT A,B,C FROM table")
加载几个后,我可以得到错误"列不存在",我只加载了不持有列C的文件。如何将此值设置为null
而不是得到错误?
DataFrameReader.json
方法提供了可选的模式参数,您可以在这里使用。如果您的模式很复杂,最简单的解决方案是重用从包含所有字段的文件中推断出来的一个:
df_complete = spark.read.json("complete_file")
schema = df_complete.schema
df_with_missing = spark.read.json("df_with_missing", schema)
# or
# spark.read.schema(schema).("df_with_missing")
如果你知道schema,但是由于某些原因你不能使用上面的,你必须从头开始创建它。
schema = StructType([
StructField("A", LongType(), True), ..., StructField("C", LongType(), True)])
一如既往,确保在加载数据后执行一些质量检查。
示例(注意所有字段都是nullable
):
from pyspark.sql.types import *
schema = StructType([
StructField("x1", FloatType()),
StructField("x2", StructType([
StructField("y1", DoubleType()),
StructField("y2", StructType([
StructField("z1", StringType()),
StructField("z2", StringType())
]))
])),
StructField("x3", StringType()),
StructField("x4", IntegerType())
])
spark.read.json(sc.parallelize(["""{"x4": 1}"""]), schema).printSchema()
## root
## |-- x1: float (nullable = true)
## |-- x2: struct (nullable = true)
## | |-- y1: double (nullable = true)
## | |-- y2: struct (nullable = true)
## | | |-- z1: string (nullable = true)
## | | |-- z2: string (nullable = true)
## |-- x3: string (nullable = true)
## |-- x4: integer (nullable = true)
spark.read.json(sc.parallelize(["""{"x4": 1}"""]), schema).first()
## Row(x1=None, x2=None, x3=None, x4=1)
spark.read.json(sc.parallelize(["""{"x3": "foo", "x1": 1.0}"""]), schema).first()
## Row(x1=1.0, x2=None, x3='foo', x4=None)
spark.read.json(sc.parallelize(["""{"x2": {"y2": {"z2": "bar"}}}"""]), schema).first()
## Row(x1=None, x2=Row(y1=None, y2=Row(z1=None, z2='bar')), x3=None, x4=None)
重要:
此方法仅适用于JSON源,取决于实现细节。