注意:我有20列和数百万行。
df_a = spark.createDataFrame([('A', 'X', 1), ('B', 'Y', 2), ('G', 'W', 7)], ["val_1", "val_2", "unique_ID"])
df_a.show()
+-----+-----+---------+
|val_1|val_2|unique_ID|
+-----+-----+---------+
| A| X| 1|
| B| Y| 2|
| G| W| 7|
+-----+-----+---------+
df_b = spark.createDataFrame([('A', 'X'), ('B', 'Y'), ('G', 'W'),('B', 'Y'),('A', 'X'), ('G', 'W'), ('G', 'W')], ["val_1", "val_2"])
df_b.show()
+-----+-----+
|val_1|val_2|
+-----+-----+
| A| X|
| B| Y|
| G| W|
| B| Y|
| A| X|
| G| W|
| G| W|
+-----+-----+
#Expected result:
+-----+-----+-----+---------+
|sl.no|val_1|val_2|unique_ID|
+-----+-----+-----+---------+
| 1| A| X| 1|
| 2| B| Y| 2|
| 3| G| W| 7|
| 4| B| Y| 2|
| 5| A| X| 1|
| 6| G| W| 7|
| 7| G| W| 7|
+-----+-----+-----+---------+
我想在spark数据帧中创建列unique_ID(如上面的结果所示(,如果val_1和val_2匹配df_b,则基于df_a创建一个unqiue_ID。
您可以使用val_1
和val_2
:连接两个数据帧
result = df_a.join(df_b, ['val_1', 'val_2'])
result.show()
+-----+-----+---------+
|val_1|val_2|unique_ID|
+-----+-----+---------+
| A| X| 1|
| A| X| 1|
| B| Y| 2|
| B| Y| 2|
| G| W| 7|
| G| W| 7|
| G| W| 7|
+-----+-----+---------+
如果要添加索引列,可以使用F.monotonically_increasing_id()
:
import pyspark.sql.functions as F
result = df_a.join(df_b, ['val_1', 'val_2']).withColumn('id', F.monotonically_increasing_id())
result.show()
+-----+-----+---------+------------+
|val_1|val_2|unique_ID| id|
+-----+-----+---------+------------+
| A| X| 1| 25769803776|
| A| X| 1| 25769803777|
| B| Y| 2|257698037760|
| B| Y| 2|257698037761|
| G| W| 7|472446402560|
| G| W| 7|472446402561|
| G| W| 7|472446402562|
+-----+-----+---------+------------+