我在Scala和Spark中与groupByKey
遇到了一些麻烦。我有两个案例类:
case class Employee(id_employee: Long, name_emp: String, salary: String)
目前我使用此第二个案例类:
case class Company(id_company: Long, employee:Seq[Employee])
但是,我想用这个新的:
case class Company(id_company: Long, name_comp: String employee:Seq[Employee])
有一个父母数据集(DF1),我与groupByKey
一起创建Company
对象:
val companies = df1.groupByKey(v => v.id_company)
.mapGroups(
{
case(k,iter) => Company(k, iter.map(x => Employee(x.id_employee, x.name_emp, x.salary)).toSeq)
}
).collect()
此代码有效,它返回这样的对象:
Company(1234,List(Employee(0987, John, 30000),Employee(4567, Bob, 50000)))
,但我找不到将Company name_comp添加到这些对象的提示(此字段存在DF1)。为了检索这样的对象(使用新案例类):
Company(1234, NYTimes, List(Employee(0987, John, 30000),Employee(4567, Bob, 50000)))
由于您想要公司ID和名称,因此可以做的是在分组数据时将元组用作密钥。构造Company
类时,这将使两个值都很容易可用:
df1.groupByKey(v => (v.id_company, v.name_comp))
.mapGroups{ case((id, name), iter) =>
Company(id, name, iter.map(x => Employee(x.id_employee, x.name_emp, x.salary)).toSeq)}
.collect()