从Spark2中的spark临时表创建表后,缺少记录



我已经从下面的序列中创建了一个数据帧。

val df = sc.parallelize(Seq((100,23,9.50),
(100,23,9.51),
(100,24,9.52),
(100,25,9.54),
(100,23,9.55),
(101,21,8.51),
(101,23,8.52),
(101,24,8.55),
(101,20,8.56))).toDF("id", "temp","time")

我想通过添加更多数据丢失的行来更新DF。因此,我已经迭代了mapPartitions中的DF来添加新行。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, Column}
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("id").orderBy("time")
val leadDf = df.withColumn("time_diff", ((lead("time", 1).over(w) - df("time")).cast("Float")*100).cast("int"))

数据帧迭代到此为止:

val result =   leadDf.rdd.mapPartitions(itr =>
new Iterator[Row] {
var prevRow = null: Row
var prevDone = true
var firstRow = true
var outputRow: Row = null: Row
var counter  = 0
var currRecord = null :Row
var currRow: Row = if (itr.hasNext) {currRecord = itr.next;  currRecord } else null
prevRow = currRow
override def hasNext: Boolean = {
if (!prevDone) {
prevRow = incrementValue(prevRow,2)
outputRow = prevRow
counter = counter -1
if(counter == 0) {
prevDone = true
}
true
} else if (itr.hasNext) {
prevRow = currRow
if(counter == 0 && prevRow.getAs[Int](3) != 1 && !isNullValue(prevRow,3 )){
outputRow = prevRow
counter = prevRow.getAs[Int](3) - 1
prevDone = false
}else if(counter > 0) {
counter = counter -1
prevDone = false
}
else {
outputRow = currRow
}
//if(counter == 0){
currRow = itr.next
true
} else if (currRow != null) {
outputRow = currRow
currRow =null
true
} else {
false
}
}
override def next(): Row = outputRow
})
val newDf = spark.createDataFrame(result,leadDf.schema)

之后,我可以在数据帧中看到12条记录。但是从由"newDf"数据帧创建的临时表创建的物理表中获得了10条记录。

newDf.registerTempTable("test")
spark.sql("create table newtest as select * from test")
scala> newDf.count
res14: Long = 12
scala> spark.sql("select * from newtest").count
res15: Long = 10

同样的代码在Spark 1.6中运行良好,最终表计数与数据帧记录计数匹配。

有人能解释为什么会发生这种情况吗?以及解决问题的任何解决方案或变通方法

我找到了一个解决方案或变通方法,它对RDD[Row]中新建的数据帧调用修复方法。

val newDf = spark.createDataFrame(result,leadDf.schema).repartition(result.getNumPartitions)

最新更新