我有两个数据帧:
val df1 = sc.parallelize(Seq((123, 2.23, 1.12), (234, 2.45, 0.12), (456, 1.112, 0.234))).toDF("objid", "ra", "dec")
val df2 = sc.parallelize(Seq((4567, 123, "name1", "val1"), (2322, 456, "name2", "val2"), (3324, 555, "name3", "val3"), (5556, 123, "name4", "val4"), (3345, 123, "name5", "val5"))).toDF("specid", "objid", "name", "value")
它们看起来如下:
df1.show()
+-----+-----+-----+
|objid| ra| dec|
+-----+-----+-----+
| 123| 2.23| 1.12|
| 234| 2.45| 0.12|
| 456|1.112|0.234|
+-----+-----+-----+
df2.show()
+------+-----+-----+-----+
|specid|objid| name|value|
+------+-----+-----+-----+
| 4567| 123|name1| val1|
| 2322| 456|name2| val2|
| 3324| 555|name3| val3|
| 5556| 123|name4| val4|
| 3345| 123|name5| val5|
+------+-----+-----+-----+
现在我想将df2作为嵌套列嵌套在df1中,因此模式应该如下所示:
val new_schema = df1.schema.add("specs", df2.schema)
new_schema: org.apache.spark.sql.types.StructType = StructType(StructField(objid,IntegerType,false), StructField(ra,DoubleType,false), StructField(dec,DoubleType,false), StructField(specs,StructType(StructField(specid,IntegerType,false), StructField(objid,IntegerType,false), StructField(name,StringType,true), StructField(value,StringType,true)),true))
我之所以想这样做,是因为df1和df2之间存在一对多的关系,这意味着每个对象有超过1个规范。我不会只加入这两张桌子。大约有50个表,我想最终连接在一起创建一个巨型表。这些表中的大多数都有1到n的关系,我只是在想一种方法来避免在最终的联接结果中有很多重复的行和空单元格。
最终结果看起来像:
+-----+-----+-----+----------------------+
| | specs |
|objid| ra| dec| specid| name | value|
+-----+-----+-----+------+----+-------+ |
| 123| 2.23| 1.12| 4567 | name1 | val1 |
| | 5556 | name4 | val4 |
| | 3345 | name5 | val5 |
+-----+-----+-----+----------------------+
| 234| 2.45| 0.12| |
+-----+-----+-----+----------------------+
| 456|1.112|0.234| 2322 | name2 | val2 |
+-----+-----+-----+----------------------+
我试图使用.withColumn
将列添加到df1,但遇到了错误。
我实际上想做的是从df2中选择条件为where df2.objid = df1.objid
的所有列来匹配行,并使其成为df1中的新列,但我不确定这是否是最好的方法。即便如此,我也不知道该怎么做。
有人能告诉我怎么做吗?
据我所知,您不能在另一个数据帧内拥有数据帧(RDD也是如此)。
您需要的是两个数据帧之间的连接。您可以执行不同类型的联接,并联接来自两个数据帧的行(这是在df1中嵌套df2列的地方)
您需要join
这两个数据帧都基于列objid
,如下面的列
val join = df1.join(df2, "objid")
join.printSchema()
输出:
root
|-- objid: integer (nullable = false)
|-- ra: double (nullable = false)
|-- dec: double (nullable = false)
|-- specid: integer (nullable = false)
|-- name: string (nullable = true)
|-- value: string (nullable = true)
当我们说
join.show()
输出将是
+-----+-----+-----+------+-----+-----+
|objid| ra| dec|specid| name|value|
+-----+-----+-----+------+-----+-----+
| 456|1.112|0.234| 2322|name2| val2|
| 123| 2.23| 1.12| 4567|name1| val1|
+-----+-----+-----+------+-----+-----+
有关更多详细信息,您可以在这里查看
更新:
我想你正在寻找类似的东西
df1.join(df2, df1("objid") === df2("objid"), "left_outer").show()
输出为:
+-----+-----+-----+------+-----+-----+-----+
|objid| ra| dec|specid|objid| name|value|
+-----+-----+-----+------+-----+-----+-----+
| 456|1.112|0.234| 2322| 456|name2| val2|
| 234| 2.45| 0.12| null| null| null| null|
| 123| 2.23| 1.12| 4567| 123|name1| val1|
| 123| 2.23| 1.12| 5556| 123|name4| val4|
| 123| 2.23| 1.12| 3345| 123|name5| val5|
+-----+-----+-----+------+-----+-----+-----+