假设我有一列
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100))
val df_1 = simpleData.toDF("employee_name", "department", "salary")
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
| James| Sales| 3000|
| Michael| Sales| 4600|
| Robert| Sales| 4100|
| Maria| Finance| 3000|
| James| Sales| 3000|
| Scott| Finance| 3300|
| Jen| Finance| 3900|
| Jeff| Marketing| 3000|
| Kumar| Marketing| 2000|
| Saif| Sales| 4100|
+-------------+----------+------+
在理想情况下,我想将列min_department_salary
、max_department_salary
、min_salary_employee_name
和max_salary_employee_name
添加到原始数据帧中。这将告诉每一行什么是最低工资和最高工资,谁得到它
所以第一行是James, Sales, 3000, 3000, 4600, James, Michael
我现在拥有的是
val df_1_5 = df_1.groupBy('department)
.agg(min('salary).as("min_department_salary"), max('salary).as("max_department_salary"))
+----------+---------------------+---------------------+
|department|min_department_salary|max_department_salary|
+----------+---------------------+---------------------+
| Sales| 3000| 4600|
| Finance| 3000| 3900|
| Marketing| 2000| 3000|
+----------+---------------------+---------------------+
这还不完全存在,我已经在那里尝试了join
和原始df。我希望避免联接,因为我有一个相当大的数据帧。
您可以使用struct
将另一列保留为
df1.withColumn("sal-name", struct($"salary", $"employee_name"))
.groupBy('department)
.agg(min("sal-name").as("min"), max("sal-name").as("max"))
.select($"department", $"min.*", $"max.*")
.toDF("department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
.show(false)
输出:
+----------+-------+------------+-------+------------+
|department|min_sal|min_sal_name|max_sal|min_sal_name|
+----------+-------+------------+-------+------------+
|Sales |3000 |James |4600 |Michael |
|Finance |3000 |Maria |3900 |Jen |
|Marketing |2000 |Kumar |3000 |Jeff |
+----------+-------+------------+-------+------------+
如果你想要所有的行,那么你可以使用window
函数而不是groupBy
val window = Window.partitionBy("department")
df1.withColumn("sal-name", struct($"salary", $"employee_name"))
.withColumn("min", min("sal-name").over(window))
.withColumn("max", max("sal-name").over(window))
.select($"employee_name", $"department", $"min.*", $"max.*")
.toDF("employee_name" ,"department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
.show(false)
输出:
+-------------+----------+-------+------------+-------+------------+
|employee_name|department|min_sal|min_sal_name|max_sal|min_sal_name|
+-------------+----------+-------+------------+-------+------------+
|James |Sales |3000 |James |4600 |Michael |
|Michael |Sales |3000 |James |4600 |Michael |
|Robert |Sales |3000 |James |4600 |Michael |
|James |Sales |3000 |James |4600 |Michael |
|Saif |Sales |3000 |James |4600 |Michael |
|Maria |Finance |3000 |Maria |3900 |Jen |
|Scott |Finance |3000 |Maria |3900 |Jen |
|Jen |Finance |3000 |Maria |3900 |Jen |
|Jeff |Marketing |2000 |Kumar |3000 |Jeff |
|Kumar |Marketing |2000 |Kumar |3000 |Jeff |
+-------------+----------+-------+------------+-------+------------+
使用窗口聚合函数而不是常规聚合函数。
val df2 = df1.sort($"department",$"salary")
.withColumn("min_department_salary",min("salary") over Window.partitionBy($"department"))
.withColumn("max_department_salary",max("salary") over Window.partitionBy($"department"))
.withColumn("min_salary_employee_name",first("employee_name") over Window.partitionBy($"department"))
.withColumn("max_salary_employee_name",last("employee_name") over Window.partitionBy($"department"))
.select("employee_name", "department", "salary",
"min_department_salary","max_department_salary",
"min_salary_employee_name","max_salary_employee_name")