Scala Spark数据帧:将字符串列分解为多个字符串



下面有什么指示吗?

输入 DF:这里col1属于string类型

+----------------------------------+
|                              col1|
+----------------------------------+
|[{a:1,g:2},{b:3,h:4},{c:5,i:6}]   |
|[{d:7,j:8},{e:9,k:10},{f:11,l:12}]|
+----------------------------------+

预期输出:(同样col1类型为string(

+-------------+
|        col1 |
+-------------+
|  {a:1,g:2}  |
|  {b:3,h:4}  |
|  {c:5,i:6}  |
|  {d:7,j:8}  |
|  {e:9,k:10} |
|  {f:11,l:12}|
+-----+

谢谢!

您可以将 Spark SQL 爆炸函数与 UDF 一起使用:

import spark.implicits._
val df = spark.createDataset(Seq("[{a},{b},{c}]","[{d},{e},{f}]")).toDF("col1")
df.show()
+-------------+
|         col1|
+-------------+
|[{a},{b},{c}]|
|[{d},{e},{f}]|
+-------------+
import org.apache.spark.sql.functions._
val stringToSeq = udf{s: String => s.drop(1).dropRight(1).split(",")}
df.withColumn("col1", explode(stringToSeq($"col1"))).show()
+----+
|col1|
+----+
| {a}|
| {b}|
| {c}|
| {d}|
| {e}|
| {f}|
+----+

编辑:对于新的输入数据,自定义UDF可以演变为上述:

val stringToSeq = udf{s: String =>
val extractor = "[^{]*:[^}]*".r
extractor.findAllIn(s).map(m => s"{$m}").toSeq
}

新输出:

+-----------+
|       col1|
+-----------+
|  {a:1,g:2}|
|  {b:3,h:4}|
|  {c:5,i:6}|
|  {d:7,j:8}|
| {e:9,k:10}|
|{f:11,l:12}|
+-----------+

Spark 提供了一个非常丰富的修剪函数,可用于删除前导字符和尾随字符,[]在您的情况下。正如@LeoC已经提到的,所需的功能可以通过内置函数实现,这些功能的性能会更好:

import org.apache.spark.sql.functions.{trim, explode, split}
val df = Seq(
("[{a},{b},{c}]"),
("[{d},{e},{f}]")
).toDF("col1")
df.select(
explode(
split(
trim($"col1", "[]"), ","))).show
// +---+
// |col|
// +---+
// |{a}|
// |{b}|
// |{c}|
// |{d}|
// |{e}|
// |{f}|
// +---+

编辑:

对于新数据集,逻辑保持不变,但您需要使用,以外的其他字符进行拆分。您可以使用regexp_replace},替换为}|来实现此目的,以便以后能够与|而不是,拆分:

import org.apache.spark.sql.functions.{trim, explode, split, regexp_replace}
val df = Seq(
("[{a:1,g:2},{b:3,h:4},{c:5,i:6}]"),
("[{d:7,j:8},{e:9,k:10},{f:11,l:12}]")
).toDF("col1")
df.select(
explode(
split(
regexp_replace(trim($"col1", "[]"), "},", "}|"), // gives: {a:1,g:2}|{b:3,h:4}|{c:5,i:6}
"\|")
)
).show(false)
// +-----------+
// |col        |
// +-----------+
// |{a:1,g:2}  |
// |{b:3,h:4}  |
// |{c:5,i:6}  |
// |{d:7,j:8}  |
// |{e:9,k:10} |
// |{f:11,l:12}|
// +-----------+

注意:使用split(..., "\|")我们转义|这是一个特殊的正则表达式字符。

你可以做:

val newDF = df.as[String].flatMap(line=>line.replaceAll("\[", "").replaceAll("\]", "").split(","))
newDF.show()

输出:

+-----+
|value|
+-----+
|  {a}|
|  {b}|
|  {c}|
|  {d}|
|  {e}|
|  {f}|
+-----+

请注意,此过程会将输出列命名为value,您可以使用selectwithColumn等轻松重命名它(如果需要(。

最后什么有效:

import spark.implicits._
val df = spark.createDataset(Seq("[{a:1,g:2},{b:3,h:4},{c:5,i:6}]","[{d:7,j:8},{e:9,k:10},{f:11,l:12}]")).toDF("col1")
df.show()
val toStr = udf((value : String) => value.split("},\{").map(_.toString))
val addParanthesis = udf((value : String) => ("{" + value + "}"))
val removeParanthesis = udf((value : String) => (value.slice(2,value.length()-2)))
import org.apache.spark.sql.functions._
df
.withColumn("col0", removeParanthesis(col("col1")))
.withColumn("col2", toStr(col("col0")))
.withColumn("col3", explode(col("col2")))
.withColumn("col4", addParanthesis(col("col3")))
.show()

输出:

+--------------------+--------------------+--------------------+---------+-----------+
|                col1|                col0|                col2|     col3|       col4|
+--------------------+--------------------+--------------------+---------+-----------+
|[{a:1,g:2},{b:3,h...|a:1,g:2},{b:3,h:4...|[a:1,g:2, b:3,h:4...|  a:1,g:2|  {a:1,g:2}|
|[{a:1,g:2},{b:3,h...|a:1,g:2},{b:3,h:4...|[a:1,g:2, b:3,h:4...|  b:3,h:4|  {b:3,h:4}|
|[{a:1,g:2},{b:3,h...|a:1,g:2},{b:3,h:4...|[a:1,g:2, b:3,h:4...|  c:5,i:6|  {c:5,i:6}|
|[{d:7,j:8},{e:9,k...|d:7,j:8},{e:9,k:1...|[d:7,j:8, e:9,k:1...|  d:7,j:8|  {d:7,j:8}|
|[{d:7,j:8},{e:9,k...|d:7,j:8},{e:9,k:1...|[d:7,j:8, e:9,k:1...| e:9,k:10| {e:9,k:10}|
|[{d:7,j:8},{e:9,k...|d:7,j:8},{e:9,k:1...|[d:7,j:8, e:9,k:1...|f:11,l:12|{f:11,l:12}|
+--------------------+--------------------+--------------------+---------+-----------+

最新更新