使用GroupByKey添加值



我在Scala和Spark中与groupByKey遇到了一些麻烦。我有两个案例类:

case class Employee(id_employee: Long, name_emp: String, salary: String)

目前我使用此第二个案例类:

case class Company(id_company: Long, employee:Seq[Employee])

但是,我想用这个新的:

case class Company(id_company: Long, name_comp: String employee:Seq[Employee])

有一个父母数据集(DF1),我与groupByKey一起创建Company对象:

val companies = df1.groupByKey(v => v.id_company)
.mapGroups(
  {
    case(k,iter) => Company(k, iter.map(x => Employee(x.id_employee, x.name_emp, x.salary)).toSeq)
  }
).collect()

此代码有效,它返回这样的对象:

Company(1234,List(Employee(0987, John, 30000),Employee(4567, Bob, 50000)))

,但我找不到将Company name_comp添加到这些对象的提示(此字段存在DF1)。为了检索这样的对象(使用新案例类):

Company(1234, NYTimes, List(Employee(0987, John, 30000),Employee(4567, Bob, 50000)))

由于您想要公司ID和名称,因此可以做的是在分组数据时将元组用作密钥。构造Company类时,这将使两个值都很容易可用:

df1.groupByKey(v => (v.id_company, v.name_comp))
  .mapGroups{ case((id, name), iter) => 
    Company(id, name, iter.map(x => Employee(x.id_employee, x.name_emp, x.salary)).toSeq)}
  .collect()

相关内容

  • 没有找到相关文章