我试图运行Flink网站上提供的CoGroup函数的Scala代码示例,但它抛出了错误"值映射不是Object"的成员;。这是我的代码
val iVals: DataSet[(String, Int)] = env.fromCollection(Seq(("a",1),("b",2),("c",3)))
val dVals: DataSet[(String, Int)] = env.fromCollection(Seq(("a",11),("b",22)))
val output = iVals.coGroup(dVals).where(0).equalTo(0) {
(iVals, dVals, out: Collector[Double]) =>
val ints = iVals map { _._2 } toSet
for (dVal <- dVals) {
for (i <- ints) {
out.collect(dVal._2 * i)
}
}
}
output.print()
我不知道是什么原因导致了错误,或者是否有我遗漏的库需要导入?谢谢
是否尝试添加iVals
和dVals
的类型注释?Scala似乎在推断类型Object
,因此出现了错误。(为什么,我不知道(。
我的意思是:
(iVals: Iterator[(String, Int)], dVals: Iterator[(String, Int)], out: Collector[Double]) =>