我是Spark的新手,我只想问你这个与Spark SQL有关的问题。让我们考虑一下EMPLOYEE表:
Employee Sub_department Department
A 105182 10
A 105182 10 (data can be redundant !)
A 114256 11
A 127855 12
A 125182 12
B 136234 13
B 133468 13
Department被定义为子字符串(sub_Department,0,2),只提取sub_departrtment的前两位数字
我想显示的是划分3种类型的员工:
- 集合1:至少有3个不同部门的员工(不考虑其子部门)
- 集合1:至少有5个不同子部门和2个不同部门的员工
- 集合3:同一部门至少有10个不同子部门的员工
具体来说,即使在经典SQL中,我也不知道如何做到这一点。但至少,我认为最终的输出可以是这样的:
Employee Sub_department total_sub_dept Department total_dept
A 105182 4 10 3
A 114256 4 11 3
A 127855 4 12 3
A 125182 4 12 3
"最终"会有一个名为"Set"的列来显示员工可以属于哪个集合,但这是可选的,我担心计算这样的值会太重。。。
显示两列(sub_department和department)中每一列的不同值和计数非常重要。
我有一个很大的表(有很多列和很多数据可能是冗余的),所以我想通过在sub_department上使用第一个分区并将其存储在第一个表上来实现这一点。然后在department上创建第二个分区(不管"sub_department"值如何),并将其存储在第二个表中。最后,根据员工姓名在两个表之间进行内部连接。
但我得到了一些错误的结果,我不知道是否有更好的方法来做到这一点?或者至少进行优化,因为department列取决于sub_department(按一组而不是按2组)。
那么,我该怎么解决这个问题呢?我试过了,但似乎不可能将count(列)与2列中的每一列的同一列相结合。
我将帮助您满足集合1中的要求,只是为了鼓励您。请试着理解下面的查询,一旦完成,就可以非常简单地完成集合2和集合3。
SELECT
employee
total_dept
FROM
(
SELECT
employee
COUNT(Department) AS total_dept
FROM
(
select
employee,
Sub_department,
SUBSTRING(Sub_department,0,2) AS Department,
ROW_NUMBER() OVER (partition by employee,SUBSTRING(Sub_department,0,2)) AS redundancy
FROM
table
)
WHERE redundancy = 1
GROUP BY employee
) WHERE total_dept >= 3
第1版:
SELECT
full_data.employee,
full_data.sub_department,
total_sub_dept_count.total_sub_dept
full_data.SUBSTRING(Sub_department,0,2) AS Department
total_dept_count.total_dept
FROM
(
SELECT
employee
COUNT(Department) AS total_dept
FROM
(
select
employee,
Sub_department,
SUBSTRING(Sub_department,0,2) AS Department,
ROW_NUMBER() OVER (partition by employee,SUBSTRING(Sub_department,0,2)) AS redundancy
FROM
employee_table
)
WHERE redundancy = 1
GROUP BY employee
) total_dept_count
JOIN
(
SELECT
employee
COUNT(department) AS total_sub_dept
FROM
(
select
employee,
department,
ROW_NUMBER() OVER (partition by employee,department) AS redundancy
FROM
employee_table
)
WHERE redundancy = 1
GROUP BY employee
) total_sub_dept_count
ON(total_dept_count.employee = total_sub_dept_count.employee)
JOIN
employee_table full_data
ON(total_sub_dept_count.employee = full_data.employee)
您可以使用窗口函数collect_set()来获取结果。看看这个
scala> val df = Seq(("A","105182","10"), ("A","105182","10" ), ("A","114256","11"), ("A","127855","12"), ("A","125182","12"), ("B","136234","13"), ("B","133468","13")).toDF("emp","subdept","dept")
df: org.apache.spark.sql.DataFrame = [emp: string, subdept: string ... 1 more field]
scala> df.printSchema
root
|-- emp: string (nullable = true)
|-- subdept: string (nullable = true)
|-- dept: string (nullable = true)
scala> df.show
+---+-------+----+
|emp|subdept|dept|
+---+-------+----+
| A| 105182| 10|
| A| 105182| 10|
| A| 114256| 11|
| A| 127855| 12|
| A| 125182| 12|
| B| 136234| 13|
| B| 133468| 13|
+---+-------+----+
scala> val df2 = df.withColumn("dept2",substring('subdept,3,7))
df2: org.apache.spark.sql.DataFrame = [emp: string, subdept: string ... 2 more fields]
scala> df2.createOrReplaceTempView("salaman")
scala> spark.sql(""" select *, size(collect_set(subdept) over(partition by emp)) sub_dep_count, size(collect_set(dept) over(partition by emp)) dep_count from salaman """).show(false)
+---+-------+----+-----+-------------+---------+
|emp|subdept|dept|dept2|sub_dep_count|dep_count|
+---+-------+----+-----+-------------+---------+
|B |136234 |13 |6234 |2 |1 |
|B |133468 |13 |3468 |2 |1 |
|A |105182 |10 |5182 |4 |3 |
|A |105182 |10 |5182 |4 |3 |
|A |125182 |12 |5182 |4 |3 |
|A |114256 |11 |4256 |4 |3 |
|A |127855 |12 |7855 |4 |3 |
+---+-------+----+-----+-------------+---------+
scala>