假设我有用Scala 2.12 编写的Spark代码
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach {
entry: String => println(entry)
})
当我运行代码时,编译器给出了这个错误
[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error] empty.foreachPartition( partition => partition.foreach{
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM
为什么编译器partition
作为Object
而不是Iterator[String]
?
我必须手动添加partition
类型才能使代码正常工作。
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
entry: String => println(entry)
})
这是因为foreachPartition
和Java Scala互操作的两个重载版本。
如果代码仅在Scala中(这是最小的代码,独立于Spark(
val dataset: Dataset[String] = ???
dataset.foreachPartition(partition => ???)
class Dataset[T] {
def foreachPartition(f: Iterator[T] => Unit): Unit = ???
def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
}
trait ForeachPartitionFunction[T] extends Serializable {
def call(t: Iterator[T]): Unit
}
则可以推断出CCD_ 6的类型(作为CCD_。
但在实际的Spark代码中,ForeachPartitionFunction
是Java接口,其方法call
接受java.util.Iterator[String]
。
所以两种选择
dataset.foreachPartition((
(partition: scala.collection.Iterator[String]) => ???
): Iterator[String] => Unit)
dataset.foreachPartition((
(partition: java.util.Iterator[String]) => ???
): ForeachPartitionFunction[String])
符合条件,编译器无法推断CCD_ 11的类型。
Scala中的推理是本地的,所以编译器可以看到partition => partition.foreach...
(而java.util.Iterator[String]
没有方法foreach
(之后,再输入partition
就太晚了。
就像@Dmytro所说的那样,scala编译器无法推断它应该应用哪个重载函数。但是,有一个简单的解决方法可以使用,通过使用这个辅助函数:
def helper[I](f: I => Unit): I => Unit = f
现在你所需要做的就是:
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach {
helper[String](entry => println(entry))
})