我试图映射一个RDD这样(见输出的结果)和映射减少的十进制值,我一直得到错误。当我尝试使用reduceByKey()与单词计数它工作得很好。十进制值的求和方式不同吗?
val voltageRDD= myRDD.map(i=> i.split(";"))
.filter(i=> i(0).split("/")(2)=="2008")
.map(i=> (i(0).split("/")(2),i(2).toFloat)).take(5)
输出:voltageRDD: Array[(String, Float)] = Array((2008,1.62), (2008,1.626), (2008,1.622), (2008,1.612), (2008,1.612))
当试图减少:
val voltageRDD= myRDD.map(i=> i.split(";"))
.filter(i=> i(0).split("/")(2)=="2008")
.map(i=> (i(0).split("/")(2),i(2).toFloat)).reduceByKey(_+_).take(5)
我得到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2954.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2954.0 (TID 15696, 10.19.240.54): java.lang.NumberFormatException: For input string: "?"
如果您的数据包含不能被浮点数解析的列,那么您应该事先过滤掉它们或相应地处理它们。这样的处理可能意味着,如果您看到一个不可解析的条目,您将分配一个值0.0f
。下面的代码就是这样做的。
val voltageRDD= myRDD.map(i=> i.split(";"))
.filter(i => i(0).split("/")(2)=="2008")
.map(i => (i(0).split("/")(2), Try{ i(2).toFloat }.toOption.getOrElse(0.0f)))
.reduceByKey(_ + _).take(5)
简短版:你可能有一行i(2)
等于?
。
根据我的评论,你的数据很可能不一致,这在第一个代码片段中不会成为问题,因为take(5)
和没有需要spark对整个数据集执行操作的操作。Spark是懒惰的,因此它只会执行计算,直到它从map -> filter -> map
链得到5
的结果。
另一方面,第二个代码片段将对整个数据集执行计算,因此它可以执行reduceByKey
,只有这样它才会得到5个结果,因此它可能会捕捉到第一个代码片段中数据集太远的问题。