我目前有JSON文件,从中我能够转储它的数据到一个临时视图通过。以下Python (PySpark)逻辑:
departMentData = spark
.read
.option("multiLine", True)
.option("mode", "PERMISSIVE")
.json("C:\Testdata.json")
.createOrReplaceTempView("vw_TestView")
这个临时视图以数组的形式保存了部门的数据和该部门的员工列表。一个员工可以是多个部门的一部分。
下面是这个视图的数据类型:
- DeptID:字符串
- DeptName:字符串
- employeeid: array<字符串.>字符串.>
和vw_TestView的表数据
DeptID | DeptName | EmployeeIDs | D01 | dev | ["U1234","U6789" |
---|---|---|
D02 | qa | ["U1234","U2345" |
您可以尝试使用explode
将Employee id列表拆分为不同的行,然后再将它们连接起来,并使用collect_list
将条目聚合为一个列表。
使用spark sql:
NB。确保Employees
作为表/视图可用,例如EmployeeData.createOrReplaceTempView("Employees")
WITH dept_employees AS (
SELECT
DeptId,
DeptName,
explode(EmployeeIDs)
FROM
vw_TestView
)
SELECT
d.DeptId,
d.DeptName,
collect_list(e.EmpID) as EmployeeIDs,
collect_list(e.EmpName) as EmployeeNames
FROM
dept_employees d
INNER JOIN
Employees e ON d.col=e.EmpID
GROUP BY
d.Deptid,
d.DeptName
或使用pyspark api
from pyspark.sql import functions as F
output_df = (
departMentData.select(
F.col("DeptId"),
F.col("DeptName"),
F.explode("EmployeeIDs")
)
.alias("d")
.join(
EmployeeData.alias("e"),
F.col("d.col")==F.col("e.EmpID"),
"inner"
)
.groupBy("d.DeptId","d.DeptName")
.agg(
F.collect_list("e.EmpID").alias("EmployeeIDs"),
F.collect_list("e.EmpName").alias("EmployeeNames")
)
)
让我知道这是否适合你。