Spark创建一个新列,其中包含其他列中相应值的最小值和最大值



假设我有一列

import spark.implicits._

val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100))

val df_1 = simpleData.toDF("employee_name", "department", "salary")
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+

在理想情况下,我想将列min_department_salarymax_department_salarymin_salary_employee_namemax_salary_employee_name添加到原始数据帧中。这将告诉每一行什么是最低工资和最高工资,谁得到它

所以第一行是James, Sales, 3000, 3000, 4600, James, Michael

我现在拥有的是

val df_1_5 = df_1.groupBy('department)
.agg(min('salary).as("min_department_salary"), max('salary).as("max_department_salary"))
+----------+---------------------+---------------------+
|department|min_department_salary|max_department_salary|
+----------+---------------------+---------------------+
|     Sales|                 3000|                 4600|
|   Finance|                 3000|                 3900|
| Marketing|                 2000|                 3000|
+----------+---------------------+---------------------+

这还不完全存在,我已经在那里尝试了join和原始df。我希望避免联接,因为我有一个相当大的数据帧。

您可以使用struct将另一列保留为

df1.withColumn("sal-name", struct($"salary", $"employee_name"))
.groupBy('department)
.agg(min("sal-name").as("min"), max("sal-name").as("max"))
.select($"department", $"min.*", $"max.*")
.toDF("department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
.show(false)

输出:

+----------+-------+------------+-------+------------+
|department|min_sal|min_sal_name|max_sal|min_sal_name|
+----------+-------+------------+-------+------------+
|Sales     |3000   |James       |4600   |Michael     |
|Finance   |3000   |Maria       |3900   |Jen         |
|Marketing |2000   |Kumar       |3000   |Jeff        |
+----------+-------+------------+-------+------------+

如果你想要所有的行,那么你可以使用window函数而不是groupBy

val window = Window.partitionBy("department")
df1.withColumn("sal-name", struct($"salary", $"employee_name"))
.withColumn("min", min("sal-name").over(window))
.withColumn("max", max("sal-name").over(window))
.select($"employee_name", $"department", $"min.*", $"max.*")
.toDF("employee_name" ,"department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
.show(false)

输出:

+-------------+----------+-------+------------+-------+------------+
|employee_name|department|min_sal|min_sal_name|max_sal|min_sal_name|
+-------------+----------+-------+------------+-------+------------+
|James        |Sales     |3000   |James       |4600   |Michael     |
|Michael      |Sales     |3000   |James       |4600   |Michael     |
|Robert       |Sales     |3000   |James       |4600   |Michael     |
|James        |Sales     |3000   |James       |4600   |Michael     |
|Saif         |Sales     |3000   |James       |4600   |Michael     |
|Maria        |Finance   |3000   |Maria       |3900   |Jen         |
|Scott        |Finance   |3000   |Maria       |3900   |Jen         |
|Jen          |Finance   |3000   |Maria       |3900   |Jen         |
|Jeff         |Marketing |2000   |Kumar       |3000   |Jeff        |
|Kumar        |Marketing |2000   |Kumar       |3000   |Jeff        |
+-------------+----------+-------+------------+-------+------------+

使用窗口聚合函数而不是常规聚合函数。

val df2 = df1.sort($"department",$"salary")
.withColumn("min_department_salary",min("salary") over Window.partitionBy($"department"))
.withColumn("max_department_salary",max("salary") over Window.partitionBy($"department"))
.withColumn("min_salary_employee_name",first("employee_name") over Window.partitionBy($"department"))
.withColumn("max_salary_employee_name",last("employee_name") over Window.partitionBy($"department"))
.select("employee_name", "department", "salary",
"min_department_salary","max_department_salary",
"min_salary_employee_name","max_salary_employee_name")

最新更新