如何动态知道 pySpark DF 是否对给定列具有空值/空值?



我必须检查传入的数据是否有任何null""" "值。我必须检查的列不是固定的。我正在从配置中读取,其中为具有允许空能力的不同文件存储了列名。

+----------+------------------+--------------------------------------------+
| FileName |     Nullable     |                  Columns                   |
+----------+------------------+--------------------------------------------+
| Sales    | Address2,Phone2  | OrderID,Address1,Address2,Phone1,Phone2    |
| Invoice  | Bank,OfcAddress  | InvoiceNo,InvoiceID,Amount,Bank,OfcAddress |
+----------+------------------+--------------------------------------------+

因此,对于每个数据/文件,我必须查看哪个字段不应包含null。基于该过程/错误输出文件。有没有pythonic方法可以做到这一点?

您显示的表结构使我相信您已将包含这些作业详细信息的文件读取为 Spark 数据帧。你可能不应该,因为它很可能不是大数据。如果将其作为 Spark 数据帧,请将其collect到驱动程序,以便可以为每个文件创建单独的 Spark 作业。

然后,每个作业都相当简单:您必须从某个文件位置读取。这些信息是由FileName捕获的,我猜想。现在,我还假设每个文件的文件格式是相同的。如果没有,则必须添加指示文件格式的元数据。目前,我认为它是CSV。

接下来,必须确定需要检查是否存在空值的列子集。这很简单:假设您有一个 DataFrame 中所有列的列表(可能派生自上一步(加载(生成的 DataFrame(和一个可以包含 null 的所有列的列表,则不能包含 null 的列列表只是这两者之间的区别。

最后,agg通过数据帧重新控制每个列中的空值数。由于这是数据帧聚合,因此结果集中只有一行,因此你可以head将其引入驱动程序。强制转换是字典,以便更轻松地访问属性。

我添加了一个函数summarize_positive_counts,它返回至少找到一条空记录的列,从而使原始表中的声明无效。

df.show(truncate=False)
# +--------+---------------+------------------------------------------+
# |FileName|Nullable       |Columns                                   |
# +--------+---------------+------------------------------------------+
# |Sales   |Address2,Phone2|OrderID,Address1,Address2,Phone1,Phone2   |
# |Invoice |Bank,OfcAddress|InvoiceNo,InvoiceID,Amount,Bank,OfcAddress|
# +--------+---------------+------------------------------------------+
jobs = df.collect()  # bring it to the driver, to create new Spark jobs from its 
from pyspark.sql.functions import col, sum as spark_sum

def report_null_counts(frame, job):
cols_to_verify_not_null = (set(job.Columns.split(","))
.difference(job.Nullable.split(",")))
null_counts = frame.agg(*(spark_sum(col(_).isNull().cast("int")).alias(_)
for _ in cols_to_verify_not_null))
return null_counts.head().asDict()

def summarize_positive_counts(filename, null_counts):
return {filename: [colname for colname, nbr_of_nulls in null_counts.items()
if nbr_of_nulls > 0]}

for job in jobs:  # embarassingly parallellizable
frame = spark.read.csv(job.FileName, header=True)
null_counts = report_null_counts(frame, job)
print(summarize_positive_counts(job.FileName, null_counts))

最新更新