Hi我有由mergeschema选项加载的稀疏数据帧
DF
name A1 A2 B1 B2 ..... partitioned_name
A 1 1 null null partition_a
B 2 2 null null partition_a
A null null 3 4 partition_b
B null null 3 4 partition_b
至
DF
name A1 A2 B1 B2 .....
A 1 1 3 4
B 2 2 3 4
没有为了效率而加入(也没有因为数据巨大而加入rdd(的最佳想法吗?我在考虑像pandas-concat(轴=1(这样的解决方案,因为所有的表都是排序的
如果该模式重复并且您不介意对列名进行硬编码:
df = spark.createDataFrame(
[
('A','1','1','null','null','partition_a'),
('B','2','2','null','null','partition_a'),
('A','null','null','3','4','partition_b'),
('B','null','null','3','4','partition_b')
],
['name','A1','A2','B1','B2','partitioned_name']
)
.withColumn('A1', F.col('A1').cast('integer'))
.withColumn('A2', F.col('A2').cast('integer'))
.withColumn('B1', F.col('B1').cast('integer'))
.withColumn('B2', F.col('B2').cast('integer'))
df.show()
import pyspark.sql.functions as F
cols_to_agg = [col for col in df.columns if col not in ["name", "partitioned_name"]]
df
.groupby('name')
.agg(F.sum('A1').alias('A1'),
F.sum('A2').alias('A2'),
F.sum('B1').alias('B1'),
F.sum('B2').alias('B2'))
.show()
+----+----+----+----+----+----------------+
# |name| A1| A2| B1| B2|partitioned_name|
# +----+----+----+----+----+----------------+
# | A| 1| 1|null|null| partition_a|
# | B| 2| 2|null|null| partition_a|
# | A|null|null| 3| 4| partition_b|
# | B|null|null| 3| 4| partition_b|
# +----+----+----+----+----+----------------+
# +----+---+---+---+---+
# |name| A1| A2| B1| B2|
# +----+---+---+---+---+
# | A| 1| 1| 3| 4|
# | B| 2| 2| 3| 4|
# +----+---+---+---+---+