Spark-如何将元素添加到结构数组中



拥有此模式:

root
|-- Elems: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Elem: integer (nullable = true)
|    |    |-- Desc: string (nullable = true)

我们怎样才能像那样添加一个新字段

root
|-- Elems: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- New_field: integer (nullable = true)
|    |    |-- Elem: integer (nullable = true)
|    |    |-- Desc: string (nullable = true)

我已经用一个简单的结构做到了这一点(更多细节见本文底部),但我无法用结构数组做到这一点。

这是测试它的代码:

val schema = new StructType()
.add("Elems", ArrayType(new StructType()
.add("Elem", IntegerType)
.add("Desc", StringType)
))
val dataDS = Seq("""
{
"Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()
val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+---------------------------+
|Elems                      |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+

一旦我们有了DF,我的最佳方法就是为每个元素创建一个数组结构:

val mod_df = df.withColumn("modif_elems", 
struct(
array(lit("")).as("New_field"),
col("Elems.Elem"),
col("Elems.Desc")
))
mod_df.show(false)
+---------------------------+-----------------------------+
|Elems                      |modif_elems                  |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+

mod_df.printSchema
root
|-- Elems: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Elem: integer (nullable = true)
|    |    |-- Desc: string (nullable = true)
|-- modif_elems: struct (nullable = false)
|    |-- New_field: array (nullable = false)
|    |    |-- element: string (containsNull = false)
|    |-- Elem: array (nullable = true)
|    |    |-- element: integer (containsNull = true)
|    |-- Desc: array (nullable = true)
|    |    |-- element: string (containsNull = true)

我们没有丢失任何数据,但这不是我想要的。

更新:PD1中的解决方案。


奖励音轨:修改结构(不在数组中)

代码几乎相同,但现在我们没有结构数组,所以修改结构更容易:

val schema = new StructType()
.add("Elems", new StructType()
.add("Elem", IntegerType)
.add("Desc", StringType)
)

val dataDS = Seq("""
{
"Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()    

val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems  |
+-------+
|[1, d1]|
+-------+
df.printSchema
root
|-- Elems: struct (nullable = true)
|    |-- Elem: integer (nullable = true)
|    |-- Desc: string (nullable = true)

在这种情况下,为了添加字段,我们需要创建另一个结构:

val mod_df = df
.withColumn("modif_elems", 
struct(
lit("").alias("New_field"),
col("Elems.Elem"),
col("Elems.Desc")
)
)
mod_df.show
+-------+-----------+
|  Elems|modif_elems|
+-------+-----------+
|[1, d1]|  [, 1, d1]|
+-------+-----------+
mod_df.printSchema
root
|-- Elems: struct (nullable = true)
|    |-- Elem: integer (nullable = true)
|    |-- Desc: string (nullable = true)
|-- modif_elems: struct (nullable = false)
|    |-- New_field: string (nullable = false)
|    |-- Elem: integer (nullable = true)
|    |-- Desc: string (nullable = true)

PD1:

好的,我已经使用了arrays_zip Spark SQL函数(2.4.0版本中的新函数),这几乎是我想要的,但我看不出我们如何更改元素名称(作为别名在这里不起作用):

val mod_df = df.withColumn("modif_elems", 
arrays_zip(
array(lit("")).as("New_field"),
col("Elems.Elem").as("Elem"),
col("Elems.Desc").alias("Desc")
)
)
mod_df.show(false)
+---------------------------+---------------------------------+
|Elems                      |modif_elems                      |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+
mod_df.printSchema
root
|-- Elems: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Elem: integer (nullable = true)
|    |    |-- Desc: string (nullable = true)
|-- modif_elems: array (nullable = true)
|    |-- element: struct (containsNull = false)
|    |    |-- 0: string (nullable = true)
|    |    |-- 1: integer (nullable = true)
|    |    |-- 2: string (nullable = true)

结构modify_elems应包含3个元素,分别命名为New_fieldElemDesc,而不是0

Spark 3.1+

withField可与transform一起使用

  • Scala

    输入:

    val df = spark.createDataFrame(Seq((1, "2")))
    .select(
    array(struct(
    col("_1").as("Elem"),
    col("_2").as("Desc")
    )).as("Elems")
    )
    df.printSchema()
    // root
    //  |-- Elems: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- Elem: integer (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    

    编写脚本

    val df2 = df.withColumn(
    "Elems",
    transform(
    $"Elems",
    x => x.withField("New_field", lit(3))
    )
    )
    df2.printSchema()
    // root
    //  |-- Elems: array (nullable = false)
    //  |    |-- element: struct (containsNull = false)
    //  |    |    |-- Elem: long (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    //  |    |    |-- New_field: integer (nullable = false)
    
  • PySpark

    输入:

    from pyspark.sql import functions as F
    df = spark.createDataFrame([(1, "2",)]) 
    .select(
    F.array(F.struct(
    F.col("_1").alias("Elem"),
    F.col("_2").alias("Desc")
    )).alias("Elems")
    )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- Elem: integer (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    

    脚本:

    df = df.withColumn(
    "Elems",
    F.transform(
    F.col("Elems"),
    lambda x: x.withField("New_field", F.lit(3))
    )
    )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = false)
    #  |    |-- element: struct (containsNull = false)
    #  |    |    |-- Elem: long (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    #  |    |    |-- New_field: integer (nullable = false)
    

此处的解决方案。我们需要使用arrays_zip,然后用结构体的重命名模式(elem_struct_recomposed)重命名获得的列:


val elem_struct_recomposed = new StructType()
.add("New_field", StringType)
.add("ElemRenamed", IntegerType)
.add("DescRenamed", StringType)

val mod_df = df
.withColumn("modif_elems_NOT_renamed", 
arrays_zip(
array(lit("")).as("New_field"),
col("Elems.Elem").as("ElemRenamed"),
col("Elems.Desc").alias("DescRenamed")
))
.withColumn("modif_elems_renamed", 
$"modif_elems_NOT_renamed".cast(ArrayType(elem_struct_recomposed)))

mod_df.show(false)
mod_df.printSchema
+---------------------------+---------------------------------+---------------------------------+
|Elems                      |modif_elems_NOT_renamed          |modif_elems_renamed              |
+---------------------------+---------------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+---------------------------------+
root
|-- Elems: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Elem: integer (nullable = true)
|    |    |-- Desc: string (nullable = true)
|-- modif_elems_NOT_renamed: array (nullable = true)
|    |-- element: struct (containsNull = false)
|    |    |-- 0: string (nullable = true)
|    |    |-- 1: integer (nullable = true)
|    |    |-- 2: string (nullable = true)
|-- modif_elems_renamed: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- New_field: string (nullable = true)
|    |    |-- ElemRenamed: integer (nullable = true)
|    |    |-- DescRenamed: string (nullable = true)

相关内容

  • 没有找到相关文章

最新更新