flink 中的案例类序列化



我正在尝试使用 Scala 的案例类构建数据集(我想在元组上使用案例类,因为我想按名称连接字段)。

这是我正在处理的联接的一个迭代:

case class TestTarget(tacticId: String, partnerId:Long)
campaignPartners.join(partnerInput).where(1).equalTo("id") {
   (target, partnerInfo, out: Collector[TestTarget]) => {
       partnerInfo.partner_pricing match {
           case Some(pricing) =>
             out.collect(TestTarget(target._1, partnerInfo.partner_id))
           case None => ()
    }
  }
}

显然,这会引发错误:

org.apache.flink.api.common.InvalidProgramException: Task not 可序列化为 org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) 在 org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171) at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:121) at org.apache.flink.api.scala.JoinDataSet$$anon$2.(joinDataSet.scala:108) 在 org.apache.flink.api.scala.JoinDataSet.apply(joinDataSet.scala:107) 在 com.adfin.dataimport.vendors.dbm.Job.calculateVendorFee(Job.scala:84)

我在这里看到了文档,指出我需要为类实现可序列化。据我所知,在新版本的 Scala 中,没有自动序列化案例类的好方法。(我研究了手动序列化,但我认为我需要对链接做一些额外的工作才能正常工作)。

编辑:根据Till Rohrmann的建议,我尝试使用一个小案例重现此错误。这就是我用来尝试重现错误的方法。此示例有效,我未能重现错误。我还尝试将选项案例放在任何地方,但这会导致工作失败。

val text = env.fromElements("To be, or not to be,--that is the question:--")
val words = text.flatMap { _.toLowerCase.split("\W+") }.map(x => (1,x))
val nums = env.fromElements(List(1,2,3,4,5)).flatMap(x => x).map(x => First(1,x))

val counts = words.join(nums).where(0).equalTo("a") {
  (a, b, out: Collector[TestTarget]) => {
    b.b match {
      case 2 => ()
      case _ => out.collect(TestTarget(a._2, b.b))
    }
  }
}

我的程序的定义使用了一个类

class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment)
        extends DspJob(conf){
    ...
    case class TestTarget(tacticId: String, partnerId:Long)
    campaignPartners.join(partnerInput).where(1).equalTo("id") {
    ...
}

由于它是一个内部类,因此不会自动序列化

如果您将类切换为不是内部类,那么一切都会解决

case class TestTarget(tacticId: String, partnerId:Long)
class Job(conf: AdfinConfig)(implicit env: ExecutionEnvironment)
        extends DspJob(conf){
    ...
    words.join( ....) 
    ...
}

相关内容

  • 没有找到相关文章

最新更新