我在 pysparkdf
和data
中有两个数据帧。架构如下所示
>>> df.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- nation: string (nullable = true)
|-- Date: timestamp (nullable = false)
|-- ZipCode: integer (nullable = true)
|-- car: string (nullable = true)
|-- van: string (nullable = true)
>>> data.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- nation: string (nullable = true)
|-- date: string (nullable = true)
|-- zipcode: integer (nullable = true)
现在,我想通过比较两个架构将列 car 和 van 添加到我的data
数据框中。
如果列相同,我还想比较两个数据框,则不执行任何操作,但如果列不同,则将列添加到没有列的数据框中。
我们如何在 pyspark 中实现这一目标。
仅供参考,我正在使用火花 1.6
将列添加到数据框后。新添加的数据框中这些列的值应为空。
例如,这里我们要向数据框添加列
data
因此数据框中的 Car 和 Van 列应包含空值,但 DF 数据框中的相同列应具有其原始值如果要添加的新列超过 2 个,会发生什么情况
由于模式不是,而是由结构字段列表组成的结构类型,我们可以检索字段列表,以比较和查找缺少的列,
df_schema = df.schema.fields
data_schema = data.schema.fields
df_names = [x.name.lower() for x in df_scehma]
data_names = [x.name.lower() for x in data_schema]
if df_schema <> data_schema:
col_diff = set(df_names) ^ set(data_names)
col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if ((x[0] is not None and x[0].name.lower() in col_diff) or x[1].name.lower() in col_diff)]
for i in col_list:
if i[0] in df_names:
data = data.withColumn("%s"%i[0],lit(None).cast(i[1]))
else:
df = df.withColumn("%s"%i[0],lit(None).cast(i[1]))
else:
print "Nothing to do"
您提到如果没有空值,则添加列,但您的架构差异是可为空的列,因此未使用该检查。如果需要,请添加检查可为空,如下所示,
col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if (x[0].name.lower() in col_diff or x[1].name.lower() in col_diff) and not x.nullable]
请查看文档以获取有关 StructType 和 StructFields 的更多信息, https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType
如果必须对多个表执行此操作,则可能需要稍微泛化一下代码。此代码采用不匹配的源列中的第一个非 null 值,以在目标表中创建新列。
from pyspark.sql.functions import lit, first
def first_non_null(f,t): # find the first non-null value of a column
return f.select(first(f[t], ignorenulls=True)).first()[0]
def match_type(f1,f2,miss): # add missing column to the target table
for i in miss:
try:
f1 = f1.withColumn(i, lit(first_non_null(f2,i)))
except:
pass
try:
f2 = f2.withColumn(i, lit(first_non_null(f1,i)))
except:
pass
return f1, f2
def column_sync_up(d1,d2): # test if the matching requirement is met
missing = list(set(d1.columns) ^ set(d2.columns))
if len(missing)>0:
return match_type(d1,d2,missing)
else:
print "Columns Match!"
df1, df2 = column_sync_up(df1,df2) # reuse as necessary