使用scala基于Spark DataFrame中现有列的聚合添加新列



我有一个DataFrame,如下所示。我需要在现有列的基础上创建一个新列。

col1 col2
a      1
a      2
b      1
c      1
d      1
d      2

输出数据帧看起来像这个

col1  col2 col3 col4
a      1   1      2
a      2   1      2
b      1   0      1
c      1   0      1
d      1   1      2
d      2   1      2

如果col1>1的计数和col4是col2的最大值,则我用来查找col3的逻辑是

我熟悉如何在sql中实现它。但使用数据帧DSL很难找到解决方案。如有任何帮助,我们将不胜感激。感谢

groupBy col1并聚合以获得count和max。然后您可以将其与原始数据帧重新连接以获得所需的结果

val df2 = df1.groupBy("col1").agg(count() as col3, max("col2") as col4) 
val df3 = df1.join(df2, "col1")

spark df具有名为带列可以根据需要添加任意数量的派生列。但该列并没有添加到现有DF中,而是创建了一个添加了列的新DF。

例如,在数据中添加静态日期

val myFormattedData = myData.withColumn("batchdate",addBatchDate(myData("batchdate")))
val addBatchDate = udf { (BatchDate: String) => "20160101" }

要添加col3,您可以使用withcolumn+when/others:

val df2 = df.withColumn("col3",when($"col2" > 1, 1).otherwise(0))

要添加col4,已经提到的groupBy/max+join应该完成以下工作:

val df3 = df2.join(df.groupBy("col1").max("col2"), "col1")

要在没有联接的情况下实现这一点,需要使用countmax作为窗口函数。这需要使用Window创建一个窗口,并告诉countmax在此窗口上操作。

from pyspark.sql import Window, functions as fn
df = sc.parallelize([
    {'col1': 'a', 'col2': 1},
    {'col1': 'a', 'col2': 2},
    {'col1': 'b', 'col2': 1},
    {'col1': 'c', 'col2': 1},
    {'col1': 'd', 'col2': 1},
    {'col1': 'd', 'col2': 2}
]).toDF()
col1_window = Window.partitionBy('col1')
df = df.withColumn('col3', fn.when(fn.count('col1').over(col1_window) > 1, 1).otherwise(0))
df = df.withColumn('col4', fn.max('col2').over(col1_window))
df.orderBy(['col1', 'col2']).show()

相关内容

  • 没有找到相关文章

最新更新