根据Spark Scala中的条件更新数据集内数组的元素



>我有一个具有格式的数据集

scala> rxClaimsUpdated.take(1)
res0: Array[(String, Array[String])] = Array((186037020,Array(
22960551, 
hfeu0ysji96afjdicbmqbheop0zsbfuvs4ongjb6yqg=,
095aa9d791b7b0b0f7f312435b8e30f1, 
2016-10-15, 
2015-02-13, 
00186037020, 
10, 
30,  
"",  
20)))

对于内部数组,如果其值为 0,我想更新第 9 个元素(最后(。(在给定的样本值为 20 中(。

我尝试过给出错误的代码是

val rxClaimsUpdatedtemp = rxClaimsUpdated.map(z => 
if(z._2(9).toInt == 0) z._2.updated(9,1) 
else z._2(9)
)

在下面找到我的错误

<console>:55: error: Unable to find encoder for type stored in a
Dataset.  Primitive types (Int, String, etc) and Product types 
(case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.
val rxClaimsUpdatedtemp = rxClaimsUpdated.map(z => if(z._2(9).toInt == 0) z._2.updated(9,1) else z._2(9))
^

您正在尝试使用Integer更新Array[String],因此它会引发错误。

这是你可以做的

rxClaimsUpdatedtemp.map(z => {
if (z._2(9).toInt == 0) { //check of zero 
z._2.update(9, "1")
z // update with above code and return the array
}
else z   //return default array
})

希望这有帮助!

上面的 Shankar Koirala 正确地指出了这个错误,即您正在尝试使用 Int 更新 Array[String] 的元素。 同一解决方案的另一种方法:

val rxClaimsUpdatedtemp = rxClaimsUpdated.map { elem =>
(
elem._1, 
elem._2.take(elem._2.length-1) ++ {if (elem._2.last == 0) Array("1") else Array()}}
)
}

在这里,rxClaimsUpdatetemp 的类型将与 rsClaimsUpdated 相同,因为在这里我们保留元组的第一个元素,同时更新第二个元素。

更新第二个元素的逻辑:从大小为 n 的数组中获取 n-1 个元素,并在检查数组的最后一个元素后附加空数组 Array(( 或 Array("1"(。

最新更新