我试图做pagerank基本的例子在flink一点点修改(仅在读取输入文件,其他一切都是一样的),我得到的错误为任务不可序列化,下面是输出错误的一部分
下面是我的代码atorg.apache.flink.api.scala.ClosureCleaner .ensureSerializable美元(ClosureCleaner.scala: 179)org.apache.flink.api.scala.ClosureCleaner .clean美元(ClosureCleaner.scala: 171)
object hpdb {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val maxIterations = 10000
val DAMPENING_FACTOR: Double = 0.85
val EPSILON: Double = 0.0001
val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"
val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target
val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid
val noOfPages = pages.count()
val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))
val adjacencyLists = links
// initialize lists ._1 is the source id and ._2 is the traget id
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
// **//the output shows error here**
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
//**//the output shows error here**
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count()))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outpath, "n", " ")
env.execute()
}
}
任何正确方向的帮助都是非常感激的?谢谢你。
问题是您从MapFunction
内引用DataSet
pages
。这是不可能的,因为DataSet
只是数据流的逻辑表示,不能在运行时访问。
要解决这个问题,你必须把val pagesCount = pages.count
的值赋给一个变量pagesCount
,并在你的MapFunction
中引用这个变量。
pages.count
实际做的是触发数据流图的执行,以便可以计算pages
中的元素数量。然后将结果返回给您的程序。