是什么是在Pyspark中列表中汇总不同数据帧列的正确方法



我想在火花数据框架中汇总不同的列。

代码

from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)
# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))

为什么不接近2。&#3。不起作用?我在火花2.2

因为,

# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

在这里,您使用的是python内构建的总和函数,该函数将其视为输入,因此可以正常工作。https://docs.python.org/2/library/functions.html#sum

#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

在这里,您使用的是Pyspark Sum函数,该功能以列为输入,但您正在尝试将其在行级别上获取。http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sum

#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))

在这里,df.Select()返回数据框架并尝试通过数据框架汇总。在这种情况下,我认为,您必须迭代行并在上面应用总和。

tl; dr builtins.sum很好。


按照您的评论:

使用本机Python sum()无法从火花优化中受益。那么,这样做的火花方式

它不是Pypark功能

我可以看到您做出错误的假设。

让我们分解问题:

[df[col] for col in ["`A.p1`","`B.p1`"]]

创建Columns的列表:

[Column<b'A.p1'>, Column<b'B.p1'>]

让我们称其为 iterable

sum通过获取此列表的元素并调用__add__方法(+)来减少输出。命令等效是:

accum = iterable[0]
for element in iterable[1:]:
    accum = accum + element

这给出了Column

Column<b'(A.p1 + B.p1)'>

与调用

相同
df["`A.p1`"] + df["`B.p1`"]

没有触摸数据,并且在评估时,这是所有火花优化的好处。

从列表中添加多个列中的多列

我尝试了很多方法,以下是我的观察:

  1. pyspark的sum功能不支持列添加(Pyspark版本2.3.1)
  2. 内置的Python的sum功能对某些人有效,但给其他人错误(可能是由于名称冲突)

在您的第三种方法中,表达式(python的sum函数)正在返回Pyspark DataFrame。

因此,可以使用pyspark中的expr函数来实现多个列的添加,该函数以计算为输入的表达式。

from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))

这为我们提供了所需的列总和。我们还可以使用任何其他复杂表达式获取其他输出。

相关内容

  • 没有找到相关文章

最新更新