通过 pyspark 数据帧上的 SQL 查询查找所有空值



我有一个混合模式(DoubleTypeStringTypeLongType等)的StructField数据帧。

我想"迭代"所有列以返回摘要统计信息。 例如:

set_min = df.select([
fn.min(self.df[c]).alias(c) for c in self.df.columns
]).collect()

是我用来查找每列中的最小值的。 这工作正常。 但是当我尝试类似设计的东西来查找 Null 时:

set_null = df.filter(
(lambda x: self.df[x]).isNull().count()
).collect()

我得到了有意义的TypeError: condition should be string or Column,我正在传递一个函数。

或列表理解:

set_null = self.df[c].alias(c).isNull() for c in self.df.columns

然后我尝试将 SQL 查询作为字符串传递:

set_null = df.filter('SELECT fields FROM table WHERE column = NUL').collect()

我得到:

ParseException: "nmismatched input 'FROM' expecting <EOF>(line 1, pos 14)nn== SQL ==nSELECT fields FROM table WHERE column = NULLn--------------^^^n"

如何将我的函数作为"字符串或列"传递,以便我可以使用filterwhere或者,为什么纯 SQL 语句不起作用?

您的尝试的几个部分存在错误:

  • 列表理解示例中缺少方括号
  • 你错过了NUL的L
  • 你的纯SQL不起作用,因为filter/where需要一个where子句,而不是一个完整的SQL语句;它们只是别名,我更喜欢使用where所以更清楚你只需要给出这样一个子句

最后你不需要使用where,就像卡尔森也显示的那样。但是从总数中减去意味着您必须评估数据帧两次(这可以通过缓存来缓解,但仍然不理想)。还有一种更直接的方法:

>>> df.select([fn.sum(fn.isnull(c).cast('int')).alias(c) for c in df.columns]).show()
+---+---+
|  A|  B|
+---+---+
|  2|  3|
+---+---+

这是有效的,因为将布尔值转换为整数会为True提供1,为False提供0。如果您更喜欢 SQL,则等效项为:

df.select([fn.expr('SUM(CAST(({c} IS NULL) AS INT)) AS {c}'.format(c=c)) for c in df.columns]).show()

或者更好,没有演员表:

df.select([fn.expr('SUM(IF({c} IS NULL, 1, 0)) AS {c}'.format(c=c)) for c in df.columns]).show()

如果要计算每列NULL个值,则可以计算非空值并从总数中减去。

例如:

from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
spark = SparkSession.builder.master("local").getOrCreate()

df = spark.createDataFrame(
data=[
(1, None),
(1, 1),
(None, None),
(1, 1),
(None, 1),
(1, None),
],
schema=("A", "B")
)
total = df.count()
missing_counts = df.select(
*[(total - fn.count(col)).alias("missing(%s)" % col) for col in df.columns]
)
missing_counts.show()
>>> +----------+----------+
... |missing(A)|missing(B)|
... +----------+----------+
... |         2|         3|
... +----------+----------+

相关内容

  • 没有找到相关文章

最新更新