如何根据筛选条件动态添加多列



在将两个数据帧与下面的代码进行比较后,我正试图根据筛选条件动态创建多个列

source_df
+---+-----+-----+----+
|key|val11|val12|date|
+---+-----+-----+-----+
|abc|  1.1| john|2-3-21
|def|  3.0| dani|2-2-21
+---+-----+-----+------
dest_df
+---+-----+-----+------+
|key|val11|val12|date  |
+---+-----+-----+------
|abc|  2.1| jack|2-3-21|
|def|  3.0| dani|2-2-21|
-----------------------
columns= source_df.columns[1:]
joined_df=source_df
.join(dest_df, 'key', 'full')
for column in columns:
column_name="difference_in_"+str(column)
report = joined_df
.filter((source_df[column] != dest_df[column]))
.withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))

我期望的输出是

#Expected
+---+-----------------+------------------+
|key| difference_in_val11| difference_in_val12 |
+---+-----------------+------------------+
|abc|[src:1.1,dst:2.1]|[src:john,dst:jack]|
+---+-----------------+-------------------+

我只得到一个列结果

#Actual
+---+-----------------+-
|key| difference_in_val12  |
+---+-----------------+-|
|abc|[src:john,dst:jack]|
+---+-----------------+-

如何根据过滤条件动态生成多列?

数据帧是不可变的对象。话虽如此,您需要使用在第一次迭代中生成的数据帧来创建另一个数据帧。类似以下内容-

from pyspark.sql import functions as F
columns= source_df.columns[1:]
joined_df=source_df
.join(dest_df, 'key', 'full')
for column in columns:
if column != columns[-1]:
column_name="difference_in_"+str(column)
report = joined_df
.filter((source_df[column] != dest_df[column]))
.withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))
else:
column_name="difference_in_"+str(column)
report1 = report.filter((source_df[column] != dest_df[column]))
.withColumn(column_name, F.concat(F.lit('[src:'), source_df[column], F.lit(',dst:'),dest_df[column],F.lit(']')))
report1.show()
#report.show()

输出-

+---+-----+-----+-----+-----+-------------------+-------------------+
|key|val11|val12|val11|val12|difference_in_val11|difference_in_val12|
+---+-----+-----+-----+-----+-------------------+-------------------+
|abc|  1.1| john|  2.1| jack|  [src:1.1,dst:2.1]|[src:john,dst:jack]|
+---+-----+-----+-----+-----+-------------------+-------------------+

您也可以使用两个数据帧的并集来执行此操作,然后仅当collect_set大小大于1时才收集列表,这可以避免加入数据帧:

from pyspark.sql import functions as F
cols = source_df.drop("key").columns
output = (source_df.withColumn("ref",F.lit("src:"))
.unionByName(dest_df.withColumn("ref",F.lit("dst:"))).groupBy("key")
.agg(*[F.when(F.size(F.collect_set(i))>1,F.collect_list(F.concat("ref",i))).alias(i)
for i in cols]).dropna(subset = cols, how='all')
)

output.show()
+---+------------------+--------------------+
|key|             val11|               val12|
+---+------------------+--------------------+
|abc|[src:1.1, dst:2.1]|[src:john, dst:jack]|
+---+------------------+--------------------+

最新更新