我有一个数据帧,其中包含重复的行,我想将它们合并为一条包含所有不同列的记录。
我的代码示例如下:
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])
生成的数据帧如下所示:
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A01| TERR NAME 01| NJ| | |
| 81A01| TERR NAME 01| | NY| |
| 81A01| TERR NAME 01| | | LA|
| 81A02| TERR NAME 01| CA| | |
| 81A02| TERR NAME 01| | | NY|
+--------+--------------+-----+------+------+
我需要根据zip_code合并/合并重复记录,并在一行中获取所有不同的状态值。
预期成果:
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A01| TERR NAME 01| NJ| NY| LA|
| 81A02| TERR NAME 01| CA| | LA|
+--------+--------------+-----+------+------+
我是 pyspark 的新手,不确定如何使用组/加入。 有人可以帮忙编写代码吗?
如果您确定每个zip_code区域组合只有 1 个州、1 个州 1 和 1 个州 2,则可以使用以下代码。max
函数使用字符串,如果分组数据中有一个字符串,因为非空字符串具有更高的值(可能是 ASCII 明智),那么空字符串""
from pyspark.sql.types import *
from pyspark.sql.functions import *
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])
df1.groupBy("zip_code","territory_name").agg(max("state").alias("state"),max("state1").alias("state1"),max("state2").alias("state2")).show()
结果:
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A02| TERR NAME 01| CA| | NY|
| 81A01| TERR NAME 01| NJ| NY| LA|
+--------+--------------+-----+------+------+
注意:对于zip_code
和territory_name
的任何唯一记录,如果在任何状态列下有多个条目,那么它们将被concatenated
。
一些解释:在此代码中,我使用了RDDs
.我首先将每条记录分为两tuples
,tuple1
作为key
,tuple2
作为value
。然后,我减少key
.x
对应于(zip_code, territory_name)
的tuple1
,tuple2
包含 3 个状态列。tuple1
被视为key
,因为我们想group by
zip_code
和territory_name
的不同值。所以,每一个像(81A01,TERR NAME 01)
、(81A02,TERR NAME 01)
这样的不同对都是一个key
,我们在此基础上reduce
。Reduce
意味着一次获取每两个值并对其执行一些operation
,然后对这个结果和下一个元素重复相同的operation
,直到整个元组耗尽。
因此,用+
operation
减少 (1,2,3,4,5) 将是 -1+2=3
,然后3+3=6
并执行+
operation
直到到达最后一个元素。因此,6+4=10
,最后10+5=15
.由于元组以 5 结束,因此结果为 15。这就是reduce
与+
操作配合的方式。因为,这里我们有strings
而不是numbers
,所以串联将在A+B=AB
发生。
df1=df1.rdd.map(lambda r: ((r.zip_code, r.territory_name), (r.state, r.state1, r.state2)))
.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))
.map(lambda r: (r[0][0],r[0][1],r[1][0],r[1][1],r[1][2]))
.toDF(["zip_code","territory_name","state","state1","state2"])
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
| 81A01| TERR NAME 01| NJ| NY| LA|
| 81A02| TERR NAME 01| CA| | NY|
+--------+--------------+-----+------+------+