包含一串键值对以匹配相应列的行内容



要求
Spark 使用 Scala 的 API 从镶木地板源扩展一列,其中存储了一列长度可变的键值字符串,以具有相应值。

示例
输入

+----------+-------------------------------------+
| identity | Original                            |
+----------+-------------------------------------+
| 1        | key1=value1&key2=value2             |
+----------+-------------------------------------+
| 2        | key2=value2&key3=value3&key7=value7 |
+----------+-------------------------------------+

输出

+----------+-------------------------------------+--------+--------+--------+--------+
| identity | Original                            | key1   | key2   | key3   | key7   |
+----------+-------------------------------------+--------+--------+--------+--------+
| 1        | key1=value1&key2=value2             | value1 | value2 |        |        |
+----------+-------------------------------------+--------+--------+--------+--------+
| 2        | key2=value2&key3=value3&key7=value7 |        | value2 | value3 | value7 |
+----------+-------------------------------------+--------+--------+--------+--------+

我的进步
通过阅读这里的多篇文章,我已经到了最后一步。以下是我为满足要求所做的努力:

  1. Original获取密钥的聚合

    val base = spark.read.parquet("path.of.parquet")
    val aggregationKeys = base.select($"Original").rdd.map{
    case observation => {
    val immediate = observation.toString.replaceAll("[\[\]]", "").split("&")
    immediate.map(_.split("=")(0))
    }
    }.collect.flatMap(y=>y).sorted.distinct
    
  2. 基于键创建新列

    import org.apache.spark.sql.types._  
    import org.apacje.spark.sql.Row  
    val aggregationKeysString = aggregationKeys.mkString("、")  
    val keysFields = aggregationKeysString.split("、")  
    .map(fieldName => StructField(fieldName, StringType, nullable=true)) 
    val keysSchema=StructType(keysFields)  
    val keysColumns = spark.createDataFrame(  
    spark.sparkContext.emptyRDD[Row], keysSchema  
    ).withColumn("identity", lit(0))  
    val transformedBase = base.join(keysColumns, Seq("identity"), "left_outer")  
    
  3. [挣扎]一些逻辑上等效于Original中存在键的 Scala 代码,其值成为步骤 2 中相应列的内容,如 Example 的输出所示。我的想法是在每行Original上获取键值对的集合,然后将值传达到相应的列

如何实现步骤3的目标?考虑到性能,是否有更好的解决方案来实现要求?因为密钥的数量可能多达数百个。

我发布后在这个网站上找到的一个相关问题解决了我一直坚持的问题。

溶液

val base = spark.read.parquet("path.of.parquet")
//Get the whole keys in the form of array
val aggregationKeys = base.select($"Original").rdd.map{
case observation => {
val immediate = observation.toString.replaceAll("[\[\]]", "").split("&")
immediate.map(_.split("=")(0))
}
}.collect.flatMap(y=>y).sorted.distinct
//Turn the collection of array to the collection of map
val columnNames: Seq[String] = aggregationKeys.filter(_.nonEmpty).toSeq
val mapifyValue = udf[Map[String, String], String] {
s => s.split("&").map(_.split("=")).map{
case Array(k, v) => k -> v
}.toMap
}
//Get the result as the output of Example portrays
val stringAsMap = tmp.withColumn("mapifiedOriginal", mapifyValue($"Original"))
val ultimateResult: DataFrame = columnNames.foldLeft(stringAsMap) {
case (df, colName) => 
df.withColumn(colName, $"mapifiedOriginal".getItem(colName))
}.drop("mapifiedOriginal")

C'est la vie.

相关内容

最新更新