Spark scala选择数据帧中的某些列作为映射



我有一个数据帧df和一个列名列表,可以从该数据帧中选择作为map

我尝试了以下方法来构建map

var df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
val cols = List("from_value","to_value")
df.select(
map(
lit(cols(0)),col(cols(0))
,lit(cols(1)),col(cols(1))
)
.as("mapped")
).show(false)

输出:

+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

然而,我确实看到了这种方法的一些问题,比如

  • 列名列表可能包含0个或最多3个列名。上面的代码将引发IndexOutOfBound异常
  • 列名在映射中的显示顺序很重要,我需要映射中的键来保持顺序
  • 列值可以为null,并且需要合并为空字符串
  • 列表中指定的列可能不存在于df

有没有一种优雅的方法可以在不太冗长的情况下处理上述场景?

您可以使用以下函数mappingExpr:选择数据帧中的某些列作为映射

import org.apache.spark.sql.functions.{col, lit, map, when}
import org.apache.spark.sql.{Column, DataFrame}
def mappingExpr(columns: Seq[String], dataframe: DataFrame): Column = {
def getValue(columnName: String): Column = when(col(columnName).isNull, lit("")).otherwise(col(columnName))
map(
columns
.filter(dataframe.columns.contains)
.flatMap(columnName => Seq(lit(columnName), getValue(columnName))): _*
).as("mapped")
}  

因此,给定您的示例数据:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
> 
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

详细说明

我的函数的主要思想是将列列表转换为元组列表,其中元组的第一个元素包含列名作为列,元组的第二个元素包含列值作为列。然后,我将这个元组列表压平,并将结果传递给map spark SQL函数

现在让我们来看看你的不同限制

列名列表可以包含0到3个列名

当我通过迭代列列表来构建插入到映射中的元素时,列名的数量不会改变任何内容。如果我们传递一个列名称的空列表,就不会有错误:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List()
>
> df.select(mappingExpr(List(), df)).show(false)
+------+
|mapped|
+------+
|[]    |
|[]    |
|[]    |
|[]    |
|[]    |
|[]    |
+------+

我需要地图上的钥匙来维持秩序

这是最棘手的一个。通常在创建映射时,由于映射的实现方式,顺序不会保留。然而,在Spark中,似乎保留了顺序,所以它只取决于列名列表的顺序。所以在您的例子中,如果我们更改列的名称顺序:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("to_value","from_value")
> 
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped                              |
+------------------------------------+
|[to_value -> xyz1, from_value -> 66]|
|[to_value -> abc1, from_value -> 67]|
|[to_value -> fgr1, from_value -> 68]|
|[to_value -> yte1, from_value -> 69]|
|[to_value -> erx1, from_value -> 70]|
|[to_value -> ter1, from_value -> 71]|
+------------------------------------+

列值可以为null,并且需要合并为空字符串

我在内部函数getValue中使用when Spark的SQL函数来实现这一点。因此,当列值为null时,返回空字符串,否则返回列值:when(col(columnName).isNull, lit("")).otherwise(col(columnName))。因此,当数据帧中有null值时,它会被空字符串所取代:

> val df = Seq((66, null,"a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("from_value","to_value")
> 
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> ]    |
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

列表中指定的列在数据帧中可能不存在

您可以使用columns方法检索数据帧的列列表。所以我使用这个方法来过滤掉不在数据帧中的所有列的名称,行为.filter(dataframe.columns.contain)。因此,当列名列表包含不在数据帧中的列名时,它将被忽略:

> val df = Seq((66, "xyz1","a"),(67, "abc1","a"),(68, "fgr1","b"),(69, "yte1","d"),(70, "erx1","q"),(71, "ter1","q")).toDF("from_value", "to_value","label")
> val cols = List("a_column_that_does_not_exist", "from_value","to_value")
> 
> df.select(mappingExpr(cols, df)).show(false)
+------------------------------------+
|mapped                              |
+------------------------------------+
|[from_value -> 66, to_value -> xyz1]|
|[from_value -> 67, to_value -> abc1]|
|[from_value -> 68, to_value -> fgr1]|
|[from_value -> 69, to_value -> yte1]|
|[from_value -> 70, to_value -> erx1]|
|[from_value -> 71, to_value -> ter1]|
+------------------------------------+

最新更新