将重复记录合并为 pyspark 数据帧中的单个记录



我有一个数据帧,其中包含重复的行,我想将它们合并为一条包含所有不同列的记录。

我的代码示例如下:

df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])

生成的数据帧如下所示:

df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|      |      |
|   81A01|  TERR NAME 01|     |    NY|      |
|   81A01|  TERR NAME 01|     |      |    LA|
|   81A02|  TERR NAME 01|   CA|      |      |
|   81A02|  TERR NAME 01|     |      |    NY|
+--------+--------------+-----+------+------+

我需要根据zip_code合并/合并重复记录,并在一行中获取所有不同的状态值。

预期成果:

+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
|   81A02|  TERR NAME 01|   CA|      |    LA|
+--------+--------------+-----+------+------+

我是 pyspark 的新手,不确定如何使用组/加入。 有人可以帮忙编写代码吗?

如果您确定每个zip_code区域组合只有 1 个州、1 个州 1 和 1 个州 2,则可以使用以下代码。max函数使用字符串,如果分组数据中有一个字符串,因为非空字符串具有更高的值(可能是 ASCII 明智),那么空字符串""

from pyspark.sql.types import *
from pyspark.sql.functions import *
df1= sqlContext.createDataFrame([("81A01","TERR NAME 01","NJ","",""),("81A01","TERR NAME 01","","NY",""),("81A01","TERR NAME 01","","","LA"),("81A02","TERR NAME 01","CA","",""),("81A02","TERR NAME 01","","","NY")], ["zip_code","territory_name","state","state1","state2"])
df1.groupBy("zip_code","territory_name").agg(max("state").alias("state"),max("state1").alias("state1"),max("state2").alias("state2")).show()

结果:

+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A02|  TERR NAME 01|   CA|      |    NY|
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
+--------+--------------+-----+------+------+

注意:对于zip_codeterritory_name的任何唯一记录,如果在任何状态列下有多个条目,那么它们将被concatenated

一些解释:在此代码中,我使用了RDDs.我首先将每条记录分为两tuplestuple1作为keytuple2作为value。然后,我减少key.x对应于(zip_code, territory_name)tuple1tuple2包含 3 个状态列。tuple1被视为key,因为我们想group byzip_codeterritory_name的不同值。所以,每一个像(81A01,TERR NAME 01)(81A02,TERR NAME 01)这样的不同对都是一个key,我们在此基础上reduceReduce意味着一次获取每两个值并对其执行一些operation,然后对这个结果和下一个元素重复相同的operation,直到整个元组耗尽。

因此,用+operation减少 (1,2,3,4,5) 将是 -1+2=3,然后3+3=6并执行+operation直到到达最后一个元素。因此,6+4=10,最后10+5=15.由于元组以 5 结束,因此结果为 15。这就是reduce+操作配合的方式。因为,这里我们有strings而不是numbers,所以串联将在A+B=AB发生。

df1=df1.rdd.map(lambda r: ((r.zip_code, r.territory_name), (r.state, r.state1, r.state2)))
.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))
.map(lambda r: (r[0][0],r[0][1],r[1][0],r[1][1],r[1][2]))
.toDF(["zip_code","territory_name","state","state1","state2"])
df1.show()
+--------+--------------+-----+------+------+
|zip_code|territory_name|state|state1|state2|
+--------+--------------+-----+------+------+
|   81A01|  TERR NAME 01|   NJ|    NY|    LA|
|   81A02|  TERR NAME 01|   CA|      |    NY|
+--------+--------------+-----+------+------+

相关内容

  • 没有找到相关文章

最新更新