在 Spark Scala 中使用不同的分隔符将字符串数组转换为字符串



我想将数据帧中的字符串数组转换为具有与逗号不同的分隔符的字符串,同时删除数组括号。我希望将","替换为";#"。这是为了避免内部可能具有","的元素,因为它是自由格式的文本字段。我正在使用火花 1.6。

示例如下:

图式:

root
|-- carLineName: array (nullable = true)
|    |-- element: string (containsNull = true)

作为数据帧输入:

+--------------------+
|carLineName         |
+--------------------+
|[Avalon,CRV,Camry]  |
|[Model T, Model S]  |
|[Cayenne, Mustang]  |
|[Pilot, Jeep]       |

期望输出:

+--------------------+
|carLineName         |
+--------------------+
|Avalon;#CRV;#Camry  | 
|Model T;#Model S    |
|Cayenne;#Mustang    |
|Pilot;# Jeep        |

生成上述输入的当前代码:

val newCarDf = carDf.select(col("carLineName").cast("String").as("carLineName"))

您可以使用本机函数array_join(它从 Spark 2.4 开始可用(:

import org.apache.spark.sql.functions.{array_join}
val l = Seq(Seq("Avalon","CRV","Camry"), Seq("Model T", "Model S"), Seq("Cayenne", "Mustang"), Seq("Pilot", "Jeep"))
val df = l.toDF("carLineName")
df.withColumn("str", array_join($"carLineName", ";#")).show()
+--------------------+------------------+
|         carLineName|               str|
+--------------------+------------------+
|[Avalon, CRV, Camry]|Avalon;#CRV;#Camry|
|  [Model T, Model S]|  Model T;#Model S|
|  [Cayenne, Mustang]|  Cayenne;#Mustang|
|       [Pilot, Jeep]|       Pilot;#Jeep|
+--------------------+------------------+

您可以创建一个用户定义的函数,该函数将元素与"#;"分隔符连接起来,如以下示例所示:

val df1  = Seq(
("1", Array("t1", "t2")),
("2", Array("t1", "t3", "t5"))
).toDF("id", "arr")
import org.apache.spark.sql.functions.{col, udf}
def formatString: Seq[String] => String = x => x.reduce(_ ++ "#;" ++ _)
def udfFormat = udf(formatString)
df1.withColumn("formatedColumn", udfFormat(col("arr")))

+---+------------+----------+
| id|         arr|  formated|
+---+------------+----------+
|  1|    [t1, t2]|    t1#;t2|
|  2|[t1, t3, t5]|t1#;t3#;t5|
+---+------------+----------+

您可以简单地编写一个用户定义的函数 udf,它将接受字符串数组作为输入参数。在 udf 内部,任何操作都可以在数组上执行。

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
def toCustomString: UserDefinedFunction = udf((carLineName: Seq[String]) => {
carLineName.mkString(";#")
})
val newCarDf = df.withColumn("carLineName", toCustomString(df.col("carLineName")))

可以通过传递分隔符作为第二个参数来进一步使此 udf 泛型。

import org.apache.spark.sql.functions.lit
def toCustomStringWithDelimiter: UserDefinedFunction = udf((carLineName: Seq[String], delimiter: String) => {
carLineName.mkString(delimiter)
})
val newCarDf = df.withColumn("carLineName", toCustomStringWithDelimiter(df.col("carLineName"), lit(";#")))

由于您使用的是1.6,我们可以将Row映射到WrappedArray

事情是这样的。

输入

scala> val carLineDf = Seq( (Array("Avalon","CRV","Camry")),
|                   (Array("Model T", "Model S")),
|                   (Array("Cayenne", "Mustang")),
|                   (Array("Pilot", "Jeep"))
|                   ).toDF("carLineName")
carLineDf: org.apache.spark.sql.DataFrame = [carLineName: array<string>]

架构::

scala> carLineDf.printSchema
root
|-- carLineName: array (nullable = true)
|    |-- element: string (containsNull = true)

然后我们只需使用Row.getAs来获取字符串的包装数组而不是 Row 对象,我们可以使用通常的 scala 内置操作:

scala> import scala.collection.mutable.WrappedArray
import scala.collection.mutable.WrappedArray
scala> carLineDf.map( row => row.getAs[WrappedArray[String]](0)).map( a => a.mkString(";#")).toDF("carLineNameAsString").show(false)
+-------------------+
|carLineNameAsString|
+-------------------+
|Avalon;#CRV;#Camry |
|Model T;#Model S   |
|Cayenne;#Mustang   |
|Pilot;#Jeep        |
+-------------------+
// Even an easier alternative
carLineDf.map( row => row.getAs[WrappedArray[String]](0)).map( r => r.reduce(_+";#"+_)).show(false)

就是这样。您可能必须使用 dataframe.rdd,否则应该这样做。

最新更新