Spark 生成包含 (SQL LIKE) 字符串的列名列表



下面是一个简单的语法,用于在特定列中搜索字符串 uisng SQL Like 功能。

val dfx = df.filter($"name".like(s"%${productName}%"))

问题是我如何获取在其 VALUES 中包含特定字符串的每一列 NAME,并生成一个新列,其中包含每行的"名称"列表。

到目前为止,这是我采取的方法,但由于我无法在 UDF 中使用 spark-sql"喜欢"函数而卡住了。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import spark.implicits._
val df1 = Seq(
  (0, "mango", "man", "dit"), 
  (1, "i-man", "man2", "mane"),
  (2, "iman", "mango", "ho"),
  (3, "dim",  "kim", "sim")
).toDF("id", "col1", "col2", "col3")
val df2 = df1.columns.foldLeft(df1) {
  (acc: DataFrame, colName: String) =>
    acc.withColumn(colName, concat(lit(colName + "="), col(colName)))
}
val df3 = df2.withColumn("merged_cols", split(concat_ws("X",  df2.columns.map(c=> col(c)):_*), "X"))

下面是一个示例输出。请注意,这里只有 3 列,但在实际工作中,我将读取多个可以包含动态列数的表。

+--------------------------------------------+
|id  |   col1|  col2|  col3|      merged_cols
+--------------------------------------------+
  0  |  mango| man  |  dit | col1, col2
  1  |  i-man| man2 | mane | col1, col2, col3
  2  |  iman | mango| ho   | col1, col2
  3  |   dim |  kim |   sim| 
+--------------------------------------------+

这可以使用列上的foldLeft以及whenotherwise来完成:

val e = "%man%"
val df2 = df1.columns.foldLeft(df.withColumn("merged_cols", lit(""))){(df, c) => 
    df.withColumn("merged_cols", when(col(c).like(e), concat($"merged_cols", lit(s"$c,"))).otherwise($"merged_cols"))}
  .withColumn("merged_cols", expr("substring(merged_cols, 1, length(merged_cols)-1)"))

满足条件e的所有列都将追加到merged_cols列中的字符串中。请注意,该列必须存在,第一个追加才能正常工作,因此在发送到foldLeft时将其添加到数据帧(包含空字符串)。

代码中的最后一行只是删除最后添加的额外,。如果您希望将结果改为数组,只需添加.withColumn("merged_cols", split($"merged_cols", ","))即可。


另一种选择是改用UDF。在处理许多列时,这可能是首选,因为foldLeft将创建多个数据帧副本。这里使用正则表达式(不像 SQL,因为它对整个列进行操作)。

val e = ".*man.*"
val concat_cols = udf((vals: Seq[String], names: Seq[String]) => {
  vals.zip(names).filter{case (v, n) => v.matches(e)}.map(_._2)
})
val df2 = df.withColumn("merged_cols", concat_cols(array(df.columns.map(col(_)): _*), typedLit(df.columns.toSeq)))

注意typedLit可以在 Spark 版本 2.2+ 中使用,当使用旧版本时,请改用array(df.columns.map(lit(_)): _*)

相关内容

  • 没有找到相关文章

最新更新