在pyspark中执行partitionBy列时,消除特定列的空值行



我有一个pyspark数据帧,如下所示:

+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null|   CT|
|222|name1|   CT|
|222|name2|   CT|
|333|name3|   CT|
|333|name4|   CT|
|333| null|   CT|
+---+-----+-----+

对于给定的ID,如果ID不重复,即使列"name"为null,我也希望保留该记录,但如果ID重复,则我希望检查name列,确保其在该ID中不包含重复项,如果"name"仅对重复的ID为null,则我也要删除该记录。以下是所需的输出:

+-----+---+-----+
| id| name|state|
+-----+---+-----+
|111| null|   CT|
|222|name1|   CT|
|222|name2|   CT|
|333|name3|   CT|
|333|name4|   CT|
+---+-----+-----+

如何在PySpark中实现这一点?

您可以通过按id列分组并计算每组中的名称数量来实现这一点。默认情况下,Spark中会忽略空值,因此任何计数为0的组都应该保留。我们现在可以过滤掉计数大于0的组中的任何null。

在Scala中,这可以通过如下窗口函数来实现:

val w = Window.partitionBy("id")
val df2 = df.withColumn("gCount", count($"name").over(w))
.filter($"name".isNotNull or $"gCount" === 0)
.drop("gCount")

PySpark等价物:

w = Window.partitionBy("id")
df.withColumn("gCount", count("name").over(w))
.filter((col("name").isNotNull()) | (col("gCount") == 0))
.drop("gCount")

以上操作不会删除同一id具有多个null的行(所有这些都将保留(。

如果也应该删除这些,只保留一行name==null,那么一个简单的方法是在运行上述代码之前或之后使用.dropDuplicates(['id','name'])。注意,这也将去除任何其他重复项(在这种情况下,.dropDuplicates(['id','name', 'state'])可能是优选的(。

我认为可以通过两个步骤来完成。首先,id的计数值

import pyspark.sql.window as psw
w = psw.Window.partitionBy("id")
df = df.withColumn("n",psf.sum(psf.lit(1)).over(w))

然后在n<1:时过滤以去除Null

df.filter(!((psf.col('name').isNull()) & (psf.col('n') > 1)))

编辑

正如@Shubham Jain所提到的,如果name有几个Null值(重复(,上面的过滤器会保留它们。在这种情况下,@Shaido提出的解决方案很有用:使用.dropDuplicates(['id','name'])添加后处理。或者.dropDuplicates(['id','name','state']),按照您的喜好

相关内容

  • 没有找到相关文章

最新更新