Spark-将字符串列拆分在一个部分中逃脱了定界符



我有两个字符串列的CSV文件(术语,代码(。代码列具有特殊格式[num]-[two_letters]-[text],其中text还可以包含DASHES -。我想使用SPARK读取此文件,恰好是四列的数据框架(term,num,twe_letters,text(。

Input
+---------------------------------+
|  term  |          code          |
+---------------------------------+
| term01 |    12-AB-some text     |
| term02 | 130-CD-some-other-text |
+---------------------------------+

Output
+------------------------------------------+
|  term  | num | letters |       text      |
+------------------------------------------+
| term01 | 12  |   AB    |   some text     |
| term02 | 130 |   CD    | some-other-text |
+------------------------------------------+

我可以在text部分没有破折号时将code列分为三列,但是我该如何实现解决所有情况的解决方案(诸如恰好在两个破折号之后获取所有文本之类的东西(?p>在此处的答案中很好地阐明了将列分为三的代码

这是regexp_extract的一个选项:

val df = Seq(("term01", "12-AB-some text"), ("term02", "130-CD-some-other-text")).toDF("term", "code")
// define the pattern that matches the string column
val p = "([0-9]+)-([a-zA-Z]{2})-(.*)"
// p: String = ([0-9]+)-([a-zA-Z]{2})-(.*)
// define the map from new column names to the group index in the pattern
val cols = Map("num" -> 1, "letters" -> 2, "text" -> 3)
// cols: scala.collection.immutable.Map[String,Int] = Map(num -> 1, letters -> 2, text -> 3)
// create the new columns on data frame
cols.foldLeft(df){ 
    case (df, (colName, groupIdx)) => df.withColumn(colName, regexp_extract($"code", p, groupIdx)) 
}.drop("code").show
+------+---+-------+---------------+
|  term|num|letters|           text|
+------+---+-------+---------------+
|term01| 12|     AB|      some text|
|term02|130|     CD|some-other-text|
+------+---+-------+---------------+

这就是我通过利用udf:

来做到这一点的方式
case class MyData(num: Int, letters: String, text: String)
def udfSplit = udf(
  (input: String) => { 
    val res = input.split("-", 3) // limit=3 => pattern applied at most n - 1 times
    MyData(res(0).toInt, res(1), res(2))
  }
)
val df = spark.createDataFrame(
    Seq(
      ("term01", "12-AB-some text"), 
      ("term02", "130-CD-some-other-text")
    )
).toDF("term", "code")
df.show(false)
+------+----------------------+
|term  |code                  |
+------+----------------------+
|term01|12-AB-some text       |
|term02|130-CD-some-other-text|
+------+----------------------+
val res = df.withColumn("code", udfSplit($"code"))
res.show(false)
+------+------------------------+
|term  |code                    |
+------+------------------------+
|term01|[12,AB,some text]       |
|term02|[130,CD,some-other-text]|
+------+------------------------+
res.printSchema
root
 |-- term: string (nullable = true)
 |-- code: struct (nullable = true)
 |    |-- num: integer (nullable = false)
 |    |-- letters: string (nullable = true)
 |    |-- text: string (nullable = true)
res.select("term", "code.*").show(false)
+------+---+-------+---------------+
|term  |num|letters|text           |
+------+---+-------+---------------+
|term01|12 |AB     |some text      |
|term02|130|CD     |some-other-text|
+------+---+-------+---------------+

最新更新