任务不可序列化Flink



我试图做pagerank基本的例子在flink一点点修改(仅在读取输入文件,其他一切都是一样的),我得到的错误为任务不可序列化,下面是输出错误的一部分

atorg.apache.flink.api.scala.ClosureCleaner .ensureSerializable美元(ClosureCleaner.scala: 179)org.apache.flink.api.scala.ClosureCleaner .clean美元(ClosureCleaner.scala: 171)

下面是我的代码
object hpdb {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val maxIterations = 10000
    val DAMPENING_FACTOR: Double = 0.85
    val EPSILON: Double = 0.0001
    val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"
    val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
                fieldDelimiter = "t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target
    val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
      fieldDelimiter = "t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid
    val noOfPages = pages.count()
    val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))
    val adjacencyLists = links
      // initialize lists ._1 is the source id and ._2 is the traget id
      .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
      // concatenate lists
      .groupBy("sourceId").reduce {
      (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
    }
    // start iteration
    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
     // **//the output shows error here**     
     currentRanks =>
        val newRanks = currentRanks
          // distribute ranks to target pages
          .join(adjacencyLists).where("pageId").equalTo("sourceId") {
          (page, adjacent, out: Collector[Page]) =>
            for (targetId <- adjacent.targetIds) {
              out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
            }
        }
          // collect ranks and sum them up
          .groupBy("pageId").aggregate(SUM, "rank")
          // apply dampening factor
         //**//the output shows error here** 
           .map { p =>
          Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count()))
        }
        // terminate if no rank update was significant
        val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
          (current, next, out: Collector[Int]) =>
            // check for significant update
            if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
        }
        (newRanks, termination)
    }
    val result = finalRanks
    // emit result
    result.writeAsCsv(outpath, "n", " ")
    env.execute()
    }
}

任何正确方向的帮助都是非常感激的?谢谢你。

问题是您从MapFunction内引用DataSet pages。这是不可能的,因为DataSet只是数据流的逻辑表示,不能在运行时访问。

要解决这个问题,你必须把val pagesCount = pages.count的值赋给一个变量pagesCount,并在你的MapFunction中引用这个变量。

pages.count实际做的是触发数据流图的执行,以便可以计算pages中的元素数量。然后将结果返回给您的程序。

相关内容

  • 没有找到相关文章

最新更新