我有一个pyspark数据帧,如下所示:
+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null| CT|
|222|name1| CT|
|222|name2| CT|
|333|name3| CT|
|333|name4| CT|
|333| null| CT|
+---+-----+-----+
对于给定的ID,如果ID不重复,即使列"name"为null,我也希望保留该记录,但如果ID重复,则我希望检查name列,确保其在该ID中不包含重复项,如果"name"仅对重复的ID为null,则我也要删除该记录。以下是所需的输出:
+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null| CT|
|222|name1| CT|
|222|name2| CT|
|333|name3| CT|
|333|name4| CT|
+---+-----+-----+
如何在PySpark中实现这一点?
您可以通过按id列分组并计算每组中的名称数量来实现这一点。默认情况下,Spark中会忽略空值,因此任何计数为0的组都应该保留。我们现在可以过滤掉计数大于0的组中的任何null。
在Scala中,这可以通过如下窗口函数来实现:
val w = Window.partitionBy("id")
val df2 = df.withColumn("gCount", count($"name").over(w))
.filter($"name".isNotNull or $"gCount" === 0)
.drop("gCount")
PySpark等价物:
w = Window.partitionBy("id")
df.withColumn("gCount", count("name").over(w))
.filter((col("name").isNotNull()) | (col("gCount") == 0))
.drop("gCount")
以上操作不会删除同一id具有多个null的行(所有这些都将保留(。
如果也应该删除这些,只保留一行name==null
,那么一个简单的方法是在运行上述代码之前或之后使用.dropDuplicates(['id','name'])
。注意,这也将去除任何其他重复项(在这种情况下,.dropDuplicates(['id','name', 'state'])
可能是优选的(。
我认为可以通过两个步骤来完成。首先,id
的计数值
import pyspark.sql.window as psw
w = psw.Window.partitionBy("id")
df = df.withColumn("n",psf.sum(psf.lit(1)).over(w))
然后在n<1
:时过滤以去除Null
df.filter(!((psf.col('name').isNull()) & (psf.col('n') > 1)))
编辑
正如@Shubham Jain所提到的,如果name
有几个Null
值(重复(,上面的过滤器会保留它们。在这种情况下,@Shaido提出的解决方案很有用:使用.dropDuplicates(['id','name'])
添加后处理。或者.dropDuplicates(['id','name','state'])
,按照您的喜好