在PySpark数据框架中添加列sum作为新列



我正在使用PySpark,我有一个带有一堆数字列的Spark数据框架。我想添加一列,它是所有其他列的和。

假设我的数据框有列"a"、"b"one_answers"c"。我知道我可以这样做:

df.withColumn('total_col', df.a + df.b + df.c)

问题是我不想单独键入每一列并添加它们,特别是当我有很多列时。我希望能够自动执行此操作,或者通过指定要添加的列名列表来执行此操作。还有其他方法可以做到这一点吗?

这并不明显。我没有看到spark Dataframes API中定义的基于行的列和。

<标题> 版本2

这可以用一种相当简单的方式完成:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns是由pyspark提供的,作为一个字符串列表,给出了Spark数据框中的所有列名。对于不同的总和,您可以提供任何其他列名列表。

我没有尝试这作为我的第一个解决方案,因为我不确定它会如何表现。

<标题> 第1版

这太复杂了,但是效果很好。

你可以这样做:

  1. 使用df.columns获取列的名称列表
  2. 使用该名称列表来创建列列表
  3. 将该列表传递给将以折叠类型函数方式调用列的重载add函数的东西

使用python的reduce,了解操作符重载的工作原理,以及这里列的pyspark代码:

def column_add(a,b):
     return  a.__add__(b)
newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

注意这是一个python reduce,而不是spark RDD reduce,并且reduce的第二个参数中的括号项需要括号,因为它是一个列表生成器表达式。

测试,工作!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

最直接的方法是使用expr函数

from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))
解决方案
newdf = df.withColumn('total', sum(df[col] for col in df.columns))

由@Paul works发布。尽管如此,我还是得到了错误,正如我看到的许多其他错误一样,

TypeError: 'Column' object is not callable

一段时间后,我发现了问题(至少在我的情况下)。问题是我之前导入了一些pyspark函数,行为

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

所以这行导入了sum pyspark命令,而df.withColumn('total', sum(df[col] for col in df.columns))应该使用正常的python sum函数。

可以使用del sum删除pyspark函数的引用。

否则,在我的例子中,我将导入改为

import pyspark.sql.functions as F

,然后引用函数为F.sum

将列表中的多个列相加为一列

PySpark的sum函数不支持列添加。这可以使用expr函数来实现。

from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))

我的问题类似于上面(更复杂一点),因为我必须添加连续列和作为PySpark数据框架中的新列。这种方法使用了上面Paul版本1中的代码:

import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)
                              ,(6,1,-4),(0,2,-2),(6,4,1)
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]
                              ,schema=['x1','x2','x3'])
df.show()
+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+
colnames=df.columns

添加累加和(连续)的新列:

for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show ()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

添加的"累计和"列如下:

cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
df = spark.createDataFrame([("linha1", "valor1", 2), ("linha2", "valor2", 5)], ("Columna1", "Columna2", "Columna3"))
df.show()
+--------+--------+--------+
|Columna1|Columna2|Columna3|
+--------+--------+--------+
|  linha1|  valor1|       2|
|  linha2|  valor2|       5|
+--------+--------+--------+
df = df.withColumn('DivisaoPorDois', df[2]/2)
df.show()
+--------+--------+--------+--------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|
+--------+--------+--------+--------------+
|  linha1|  valor1|       2|           1.0|
|  linha2|  valor2|       5|           2.5|
+--------+--------+--------+--------------+
df = df.withColumn('Soma_Colunas', df[2]+df[3])
df.show()
+--------+--------+--------+--------------+------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|Soma_Colunas|
+--------+--------+--------+--------------+------------+
|  linha1|  valor1|       2|           1.0|         3.0|
|  linha2|  valor2|       5|           2.5|         7.5|
+--------+--------+--------+--------------+------------+

一个非常简单的方法是使用select而不是withcolumn,如下所示:

df = df.select('*', (col("a")+col("b")+col('c).alias("total"))

这应该会给你所需的金额,并根据需求进行微小的更改

下面的方法适合我:

  1. 导入sql函数
    从pyspark。sql import functions as F
  2. 使用F.expr(list_of_columns)
    data_frame.withColumn('Total_Sum',F.expr('col_name1+col_name2+..col_namen)

相关内容

  • 没有找到相关文章

最新更新