Flatten + (~self-join)在Scala中使用结构数组的spark数据帧



输入数据帧:

{
"F1" : "A",
"F2" : "B",
"F3" : [
{
"name" : "N1",
"sf1" : "val_1",
"sf2" : "val_2"
},
{
"name" : "N2",
"sf1" : "val_3",
"sf2" : "val_4"
}
],
"F4" : {
"SF1" : "val_5",
"SF2" : "val_6",
"SF3" : "val_7"
}
}

所需输出:

[
{
"F1" : "A",
"F2" : "B",
"F3_name" : "N1",
"F3_sf1" : "val_1",
"F3_sf2" : "val_2",

"F4_SF1" : "val_7",
"F4_SF2" : "val_8",
"F4_SF3" : "val_9",
},
{
"F1" : "A",
"F2" : "B",
"F3_name" : "N2",
"F3_sf1" : "val_3",
"F3_sf2" : "val_4",

"F4_SF1" : "val_7",
"F4_SF2" : "val_8",
"F4_SF3" : "val_9",
}
]

F3是一个struct数组。新的数据帧应该是平面的,并根据F3中的项数将这一行转换为一行或多行(在本例中为2行)。

我是Spark的新手Scala。关于如何实现这种转变的任何想法都将非常有帮助。

谢谢!

您也可以先使用explode。然后,您可以使用一系列别名(例如,$"F3.name" as "F3_name")提取和重命名字段:

scala> case class NameSF(name: String, sf1: String, sf2: String)
defined class NameSF
scala> case class SF(SF1: String, SF2: String, SF3: String)
defined class SF
scala> case class F(F1: String, F2: String, F3: Array[NameSF], F4: SF)
defined class F
scala> val elem = F("A",
|              "B",
|              Array(NameSF("N1", "val_1", "val_2"), NameSF("N2", "val_3", "val_4")),
|              SF("val_5", "val_6", "val_7"))
elem: F = F(A,B,[LNameSF;@2939bfa0,SF(val_5,val_6,val_7))
scala> val df = spark.createDataset(Seq(elem)).toDF
df: org.apache.spark.sql.DataFrame = [F1: string, F2: string ... 2 more fields]
scala> df.withColumn("F3", explode($"F3")).select($"F1",
|                                            $"F2",
|                                            $"F3.name" as "F3_name",
|                                            $"F3.sf1" as "F3_sf1",
|                                            $"F3.sf2" as "F3_sf2",
|                                            $"F4.SF1" as "F4_SF1",
|                                            $"F4.SF2" as "F4_SF2",
|                                            $"F4.SF3" as "F4_SF3").show
+---+---+-------+------+------+------+------+------+                            
| F1| F2|F3_name|F3_sf1|F3_sf2|F4_SF1|F4_SF2|F4_SF3|
+---+---+-------+------+------+------+------+------+
|  A|  B|     N1| val_1| val_2| val_5| val_6| val_7|
|  A|  B|     N2| val_3| val_4| val_5| val_6| val_7|
+---+---+-------+------+------+------+------+------+

可以使用inline爆炸和扩展F3,*扩展F4:

val df2 = df.selectExpr("F1","F2","inline(F3)","F4.*")
df2.show
+---+---+----+-----+-----+-----+-----+-----+
| F1| F2|name|  sf1|  sf2|  SF1|  SF2|  SF3|
+---+---+----+-----+-----+-----+-----+-----+
|  A|  B|  N1|val_1|val_2|val_5|val_6|val_7|
|  A|  B|  N2|val_3|val_4|val_5|val_6|val_7|
+---+---+----+-----+-----+-----+-----+-----+

相关内容

  • 没有找到相关文章

最新更新