使用Spark DataFrame中的多列更改行的值



我得到了这种格式的数据帧(df)。

df.show()
********************
X1 | x2  | X3 | ..... | Xn   | id_1 | id_2 | .... id_23
1  |  ok |good|         john | null | null |     |null
2  |rick |good|       | ryan | null | null |     |null
....

我得到了一个数据帧,其中有很多列,数据帧名为df。我需要编辑这个数据帧(df)中的列。我有两个映射,m1(Integer->Integer)和m2(Integer-->String)映射。

我需要查看每一行,取X1列的值,看到X1在m1中的映射值,它将在[1,23]的范围内,设为5,还可以找到X1在m2中的映射的值,这将类似于X8。我需要将X8列的值添加到id_5中。我有以下代码,但我无法使其工作。

val dfEdited = df.map( (row) => {
val mapValue = row.getAs("X1")
row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue)
})

您在row.getAs("id_"+m1.get(mapValue)) = row.getAs(m2.get(mapValue)中所做的工作没有意义。

首先,您要为操作getAs("id_"+m1.get(mapValue))的结果分配一个值,这会给您一个不可变的值。其次,您没有正确使用方法getAs,因为您需要指定该方法返回的数据类型。

我不确定我是否正确理解了你想做的事情,我想你遗漏了一些细节。不管怎样,这是我得到的,效果很好。

当然,我已经评论了每一行代码,这样你就可以很容易地理解它

// First of all we need to create a case class to wrap the content of each row.
case class Schema(X1: Int, X2: String, X3: String, X4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String])

val dfEdited = ds.map( row => {
// We use the getInt method to get the value of a field which is expected to be Int
val mapValue = row.getInt(row.fieldIndex("X1"))
// fieldIndex gives you the position inside the row fo the field you are looking for. 
// Regarding m1(mapValue), NullPointer might be thrown if mapValue is not in that Map. 
// You need to implement mechanisms to deal with it (for example, an if...else clause, or using the method getOrElse)
val indexToModify = row.fieldIndex("id_" + m1(mapValue)) 
// We convert the row to a sequence, and pair each element with its index.
// Then, with the map method we generate a new sequence.
// We replace the element situated in the position indexToModify.
// In addition, if there are null values, we have to convert it to an object of type Option.
// It is necessary for the next step.
val seq = row.toSeq.zipWithIndex.map(x => if (x._2 == indexToModify) Some(m2(mapValue)) else if(x._1 == null) None else x._1)

// Finally, you have to create the Schema object by using pattern matching
seq match {
case Seq(x1: Int, x2: String, x3: String, x4: String, id_1: Option[String], id_2: Option[String], id_3: Option[String]) => Schema(x1, x2,x3,x4, id_1, id_2, id_3)
}
})

一些评论:

  • ds对象是一个数据集。数据集必须有一个结构。您不能修改map方法内部的行并返回它们,因为Spark不知道数据集的结构是否发生了变化。出于这个原因,我返回了一个case类对象,因为它为Dataset对象提供了一个结构。

  • 请记住,空值可能会出现问题。如果您不建立机制来处理例如X1的值不在m1中的情况,则此代码可能会抛出空指针。

希望它能起作用。

相关内容

  • 没有找到相关文章

最新更新