Scala编译器无法推断Spark lambda函数内部的类型



假设我有用Scala 2.12 编写的Spark代码

val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach {
entry: String => println(entry)
})

当我运行代码时,编译器给出了这个错误


[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error]     empty.foreachPartition( partition => partition.foreach{
[error]                                                    ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM

为什么编译器partition作为Object而不是Iterator[String]

我必须手动添加partition类型才能使代码正常工作。

val dataset = spark.emptyDataset[String]
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
entry: String => println(entry)
})

这是因为foreachPartition和Java Scala互操作的两个重载版本。

如果代码仅在Scala中(这是最小的代码,独立于Spark(

val dataset: Dataset[String] = ???
dataset.foreachPartition(partition => ???)
class Dataset[T] {
def foreachPartition(f: Iterator[T] => Unit): Unit = ???
def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}
trait ForeachPartitionFunction[T] extends Serializable {
def call(t: Iterator[T]): Unit
}

则可以推断出CCD_ 6的类型(作为CCD_。

但在实际的Spark代码中,ForeachPartitionFunction是Java接口,其方法call接受java.util.Iterator[String]

所以两种选择

dataset.foreachPartition((
(partition: scala.collection.Iterator[String]) => ??? 
): Iterator[String] => Unit)
dataset.foreachPartition((
(partition: java.util.Iterator[String]) => ??? 
): ForeachPartitionFunction[String])

符合条件,编译器无法推断CCD_ 11的类型。

Scala中的推理是本地的,所以编译器可以看到partition => partition.foreach...(而java.util.Iterator[String]没有方法foreach(之后,再输入partition就太晚了。

就像@Dmytro所说的那样,scala编译器无法推断它应该应用哪个重载函数。但是,有一个简单的解决方法可以使用,通过使用这个辅助函数:

def helper[I](f: I => Unit): I => Unit = f

现在你所需要做的就是:

dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
helper[String](entry => println(entry))
})

最新更新