Spark --Error :类型不匹配;找到:(整数,字符串)必需:TraversableOnce[?]



我是Spark编程和scala的新手,我无法理解map和flatMap之间的区别。使用flatMap时,为什么在方法中使用"选项"工作正常

def parseNames(line: String) : Option[(Int,String)]  = {
var fields = line.split('"')
if (fields.length >1) {
return Some(fields(0).trim().toInt,fields(1) )
}
else {
return None
}
}
def main(args: Array[String]) {
val sc = new SparkContext("local[*]","DemoHero")
val txt= sc.textFile("../marvel-names1.txt")  
val rdd = txt.flatMap(parseNames)

但是没有"选项",就会出现错误:

def parseNames(line: String) : (Int, String)  = {
var fields = line.split('"')    
(fields(0).trim().toInt,fields(1) )
}
def main(args: Array[String]) {
val sc = new SparkContext("local[*]","DemoHero")   
val txt= sc.textFile("../marvel-names1.txt")  
val rdd = txt.flatMap(parseNames)

根据我的理解,平面映射使 Rdd 成为字符串/Int Rdd 的集合。我在想,在这种情况下,两者都应该没有任何错误地工作。请让我知道我在哪里犯了错误。

TL;DR:有一个从OptionIterable的隐式转换,这就是为什么你的第一个flatMap不会失败。


Option的继承层次结构来看,根本不清楚为什么RDD的flatMap期望 返回类型中带有TraversableOnce的参数将接受返回Option的函数,因为Option不扩展TraversableOnce

但是,如果打印flatMap生成的脱糖代码,则会出现以下合成函数定义:

@SerialVersionUID(value = 0) final <synthetic> class anonfun$1 extends scala.runtime.AbstractFunction1 with Serializable {
final def apply(line: String): Iterable = scala.this.Option.option2Iterable(org.example.ClassName.parseNames$1(line));
final <bridge> <artifact> def apply(v1: Object): Object = anonfun$1.this.apply(v1.$asInstanceOf[String]());
def <init>(): <$anon: Function1> = {
anonfun$1.super.<init>();
()
}
}

细节并不那么重要,它是一些需要line: String并返回Iterable的东西。 有趣的是Option.option2Iterable部分。

这是直接在选项上定义的隐式转换, 它悄悄地将选项转换为IterableIterableTraversableOnce的特例。

这就是编译器如何将option2Iterable潜入合成Function定义的方式 在您的方法和flatMap调用之间进行调解。现在你有一个论点 键入String => Iterable[(Int, String)],因此flatMap可以正常编译。

请注意,如果没有包装方法的合成Function实例,它将无法工作。如果您像这样声明parseNames

def parseNames: String => Option[(Int,String)] = { line => 

这将是一个简单的编译器错误。


你的第二个代码片段不应该编译,幸运的是,它确实没有编译:对不是Traversable,所以flatMap不接受parseNames(line: String) : (Int, String)作为论据。你想在这里使用的是什么map,因为你想将每个字符串恰好映射到一对(Int, String)

flatMap用于不同的用例:它用于将原始集合中的每个元素转换为 另一个集合,然后将所有生成的集合平展为单个集合,例如,

sc.parallelize(List(1, 2, 3)).flatMap{ x => List(x, x*x, x*x*x) }

将首先为每个x生成一个TraversableOnce

List(1,1,1)
List(2,4,8)
List(3,9,27)

然后将它们全部粘合在一起,以便获得带有条目的RDD

1,1,1,2,4,8,3,9,27

它以相同的方式与Option一起工作,因为"道德上"它类似于具有 0 到 1 元素的列表,即使它在其继承层次结构中没有明确说明。

注意关于"不应该编译

"的表述:每当我写(你的代码或其他一些代码)"不应该编译"时,我并不是说我通常希望你的代码中有编译错误。我的意思是,如果代码中存在一些问题,编译器应该尽快生成明确的错误消息。

def parseNames (line: String): Option[(Int,String)]  = {
var fields = line.split('"')
if (fields.length > 1) {
Some (fields(0).trim ().toInt, fields(1))
}
else {
None
}
}

(去掉了嘈杂的"返回")

那么,None什么时候归还?如果 fields.length 不是> 1。如果没有至少 2 个字段(字段 (0) 和字段 (1)),fields(0).trim().toInt可能会成功,但fields(1)会失败。

flatMap需要迭代对象作为被调用函数的返回类型。因为flatMap会遍历可迭代的每个元素并返回扁平化的每个元素。

在第一个parseNames函数中,返回Option[(Int, String)],它是一个容器,由于使用了隐式函数,它可以像可迭代对象一样运行。所以flatMap工作了。

但是在你的第二个parseNames中,返回的Tuple2[Int, String]不是一个可迭代的。由于Tuple2无法迭代,但可以使用_1_2获取元素。所以flatMap向你显示编译错误.

我希望解释清楚。

如果您返回Array,则第二个parseNames会起作用

def parseNames(line: String) : Array[(Int, String)]  = {
var fields = line.split('"')
Array((fields(0).trim().toInt,fields(1)))
}

List

def parseNames(line: String) : List[(Int, String)]  = {
var fields = line.split('"')
List((fields(0).trim().toInt,fields(1)))
}

Seq

def parseNames(line: String) : Seq[(Int, String)]  = {
var fields = line.split('"')
Seq((fields(0).trim().toInt,fields(1)))
}

因为它们都是可迭代对象Option

相关内容

最新更新