拥有此模式:
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
我们怎样才能像那样添加一个新字段?
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- New_field: integer (nullable = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
我已经用一个简单的结构做到了这一点(更多细节见本文底部),但我无法用结构数组做到这一点。
这是测试它的代码:
val schema = new StructType()
.add("Elems", ArrayType(new StructType()
.add("Elem", IntegerType)
.add("Desc", StringType)
))
val dataDS = Seq("""
{
"Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()
val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+---------------------------+
|Elems |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+
一旦我们有了DF,我的最佳方法就是为每个元素创建一个数组结构:
val mod_df = df.withColumn("modif_elems",
struct(
array(lit("")).as("New_field"),
col("Elems.Elem"),
col("Elems.Desc")
))
mod_df.show(false)
+---------------------------+-----------------------------+
|Elems |modif_elems |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+
mod_df.printSchema
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
|-- modif_elems: struct (nullable = false)
| |-- New_field: array (nullable = false)
| | |-- element: string (containsNull = false)
| |-- Elem: array (nullable = true)
| | |-- element: integer (containsNull = true)
| |-- Desc: array (nullable = true)
| | |-- element: string (containsNull = true)
我们没有丢失任何数据,但这不是我想要的。
更新:PD1中的解决方案。
奖励音轨:修改结构(不在数组中)
代码几乎相同,但现在我们没有结构数组,所以修改结构更容易:
val schema = new StructType()
.add("Elems", new StructType()
.add("Elem", IntegerType)
.add("Desc", StringType)
)
val dataDS = Seq("""
{
"Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()
val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems |
+-------+
|[1, d1]|
+-------+
df.printSchema
root
|-- Elems: struct (nullable = true)
| |-- Elem: integer (nullable = true)
| |-- Desc: string (nullable = true)
在这种情况下,为了添加字段,我们需要创建另一个结构:
val mod_df = df
.withColumn("modif_elems",
struct(
lit("").alias("New_field"),
col("Elems.Elem"),
col("Elems.Desc")
)
)
mod_df.show
+-------+-----------+
| Elems|modif_elems|
+-------+-----------+
|[1, d1]| [, 1, d1]|
+-------+-----------+
mod_df.printSchema
root
|-- Elems: struct (nullable = true)
| |-- Elem: integer (nullable = true)
| |-- Desc: string (nullable = true)
|-- modif_elems: struct (nullable = false)
| |-- New_field: string (nullable = false)
| |-- Elem: integer (nullable = true)
| |-- Desc: string (nullable = true)
PD1:
好的,我已经使用了arrays_zip Spark SQL函数(2.4.0版本中的新函数),这几乎是我想要的,但我看不出我们如何更改元素名称(作为或别名在这里不起作用):
val mod_df = df.withColumn("modif_elems",
arrays_zip(
array(lit("")).as("New_field"),
col("Elems.Elem").as("Elem"),
col("Elems.Desc").alias("Desc")
)
)
mod_df.show(false)
+---------------------------+---------------------------------+
|Elems |modif_elems |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+
mod_df.printSchema
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
|-- modif_elems: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- 0: string (nullable = true)
| | |-- 1: integer (nullable = true)
| | |-- 2: string (nullable = true)
结构modify_elems应包含3个元素,分别命名为New_field、Elem和Desc,而不是0。
Spark 3.1+
withField
可与transform
一起使用
-
Scala
输入:
val df = spark.createDataFrame(Seq((1, "2"))) .select( array(struct( col("_1").as("Elem"), col("_2").as("Desc") )).as("Elems") ) df.printSchema() // root // |-- Elems: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- Elem: integer (nullable = true) // | | |-- Desc: string (nullable = true)
编写脚本
val df2 = df.withColumn( "Elems", transform( $"Elems", x => x.withField("New_field", lit(3)) ) ) df2.printSchema() // root // |-- Elems: array (nullable = false) // | |-- element: struct (containsNull = false) // | | |-- Elem: long (nullable = true) // | | |-- Desc: string (nullable = true) // | | |-- New_field: integer (nullable = false)
-
PySpark
输入:
from pyspark.sql import functions as F df = spark.createDataFrame([(1, "2",)]) .select( F.array(F.struct( F.col("_1").alias("Elem"), F.col("_2").alias("Desc") )).alias("Elems") ) df.printSchema() # root # |-- Elems: array (nullable = true) # | |-- element: struct (containsNull = true) # | | |-- Elem: integer (nullable = true) # | | |-- Desc: string (nullable = true)
脚本:
df = df.withColumn( "Elems", F.transform( F.col("Elems"), lambda x: x.withField("New_field", F.lit(3)) ) ) df.printSchema() # root # |-- Elems: array (nullable = false) # | |-- element: struct (containsNull = false) # | | |-- Elem: long (nullable = true) # | | |-- Desc: string (nullable = true) # | | |-- New_field: integer (nullable = false)
此处的解决方案。我们需要使用arrays_zip,然后用结构体的重命名模式(elem_struct_recomposed
)重命名获得的列:
val elem_struct_recomposed = new StructType()
.add("New_field", StringType)
.add("ElemRenamed", IntegerType)
.add("DescRenamed", StringType)
val mod_df = df
.withColumn("modif_elems_NOT_renamed",
arrays_zip(
array(lit("")).as("New_field"),
col("Elems.Elem").as("ElemRenamed"),
col("Elems.Desc").alias("DescRenamed")
))
.withColumn("modif_elems_renamed",
$"modif_elems_NOT_renamed".cast(ArrayType(elem_struct_recomposed)))
mod_df.show(false)
mod_df.printSchema
+---------------------------+---------------------------------+---------------------------------+
|Elems |modif_elems_NOT_renamed |modif_elems_renamed |
+---------------------------+---------------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+---------------------------------+
root
|-- Elems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Elem: integer (nullable = true)
| | |-- Desc: string (nullable = true)
|-- modif_elems_NOT_renamed: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- 0: string (nullable = true)
| | |-- 1: integer (nullable = true)
| | |-- 2: string (nullable = true)
|-- modif_elems_renamed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- New_field: string (nullable = true)
| | |-- ElemRenamed: integer (nullable = true)
| | |-- DescRenamed: string (nullable = true)