我想在火花数据框架中汇总不同的列。
代码
from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
为什么不接近2。&#3。不起作用?我在火花2.2
因为,
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
在这里,您使用的是python内构建的总和函数,该函数将其视为输入,因此可以正常工作。https://docs.python.org/2/library/functions.html#sum
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
在这里,您使用的是Pyspark Sum函数,该功能以列为输入,但您正在尝试将其在行级别上获取。http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sum
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
在这里,df.Select()返回数据框架并尝试通过数据框架汇总。在这种情况下,我认为,您必须迭代行并在上面应用总和。
tl; dr builtins.sum
很好。
按照您的评论:
使用本机Python sum()无法从火花优化中受益。那么,这样做的火花方式
和
它不是Pypark功能
我可以看到您做出错误的假设。
让我们分解问题:
[df[col] for col in ["`A.p1`","`B.p1`"]]
创建Columns
的列表:
[Column<b'A.p1'>, Column<b'B.p1'>]
让我们称其为 iterable
。
sum
通过获取此列表的元素并调用__add__
方法(+
)来减少输出。命令等效是:
accum = iterable[0]
for element in iterable[1:]:
accum = accum + element
这给出了Column
:
Column<b'(A.p1 + B.p1)'>
与调用
相同df["`A.p1`"] + df["`B.p1`"]
没有触摸数据,并且在评估时,这是所有火花优化的好处。
从列表中添加多个列中的多列
我尝试了很多方法,以下是我的观察:
- pyspark的
sum
功能不支持列添加(Pyspark版本2.3.1) - 内置的Python的
sum
功能对某些人有效,但给其他人错误(可能是由于名称冲突)
在您的第三种方法中,表达式(python的sum
函数)正在返回Pyspark DataFrame。
因此,可以使用pyspark中的expr
函数来实现多个列的添加,该函数以计算为输入的表达式。
from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))
这为我们提供了所需的列总和。我们还可以使用任何其他复杂表达式获取其他输出。