PySpark-为单个行而不是所有记录设置/删除列



我想找到一个解决方案,在进行粘合转换后处理AWS DynamoDB中的空值。通常情况下,数据为null,当查询完成时,列不会填充某些字段。。。但是在使用胶水的转换中,字段被设置为null。。。因此显示为空。

我在上找到了以下脚本

def drop_null_columns(df):
import pyspark.sql.functions as F
null_counts = (
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
.collect()[0]
.asDict()
)
to_drop = [k for (k, v) in null_counts.items() if v > 0]
newdf = df.drop(*to_drop)
return newdf

但是这个脚本会删除所有甚至有1个null的列,例如下面的示例。

之前

C1  | C2  | C3
------------
123 |null | 12
123 |15   | 12
123 |15   | 12
123 |12   | 12

C1  | C3
------------
123 | 12
123 | 12
123 | 12
123 | 12

我希望发生的是,将存在NULL的行设置为空白/空。

之前

C1  | C2  | C3
------------
123 |null | 12
123 |15   | 12
123 |15   | 12
123 |12   | 12

C1  | C2  | C3
------------
123 |     | 12
123 |15   | 12
123 |15   | 12
123 |12   | 12

Spark自动推断出C2字段的类型为long,因此在替换时需要指定long类型的值。如果推断出C2字段的类型是string,则可以使用df.fillna('')

newdf = df.fillna(0)

最新更新