我有一个混合模式(DoubleType
、StringType
、LongType
等)的StructField
数据帧。
我想"迭代"所有列以返回摘要统计信息。 例如:
set_min = df.select([
fn.min(self.df[c]).alias(c) for c in self.df.columns
]).collect()
是我用来查找每列中的最小值的。 这工作正常。 但是当我尝试类似设计的东西来查找 Null 时:
set_null = df.filter(
(lambda x: self.df[x]).isNull().count()
).collect()
我得到了有意义的TypeError: condition should be string or Column
,我正在传递一个函数。
或列表理解:
set_null = self.df[c].alias(c).isNull() for c in self.df.columns
然后我尝试将 SQL 查询作为字符串传递:
set_null = df.filter('SELECT fields FROM table WHERE column = NUL').collect()
我得到:
ParseException: "nmismatched input 'FROM' expecting <EOF>(line 1, pos 14)nn== SQL ==nSELECT fields FROM table WHERE column = NULLn--------------^^^n"
如何将我的函数作为"字符串或列"传递,以便我可以使用filter
或where
或者,为什么纯 SQL 语句不起作用?
您的尝试的几个部分存在错误:
- 列表理解示例中缺少方括号
- 你错过了
NUL
的L
分 - 你的纯SQL不起作用,因为
filter
/where
需要一个where子句,而不是一个完整的SQL语句;它们只是别名,我更喜欢使用where
所以更清楚你只需要给出这样一个子句
。
最后你不需要使用where
,就像卡尔森也显示的那样。但是从总数中减去意味着您必须评估数据帧两次(这可以通过缓存来缓解,但仍然不理想)。还有一种更直接的方法:
>>> df.select([fn.sum(fn.isnull(c).cast('int')).alias(c) for c in df.columns]).show()
+---+---+
| A| B|
+---+---+
| 2| 3|
+---+---+
这是有效的,因为将布尔值转换为整数会为True
提供1
,为False
提供0
。如果您更喜欢 SQL,则等效项为:
df.select([fn.expr('SUM(CAST(({c} IS NULL) AS INT)) AS {c}'.format(c=c)) for c in df.columns]).show()
或者更好,没有演员表:
df.select([fn.expr('SUM(IF({c} IS NULL, 1, 0)) AS {c}'.format(c=c)) for c in df.columns]).show()
如果要计算每列NULL
个值,则可以计算非空值并从总数中减去。
例如:
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
spark = SparkSession.builder.master("local").getOrCreate()
df = spark.createDataFrame(
data=[
(1, None),
(1, 1),
(None, None),
(1, 1),
(None, 1),
(1, None),
],
schema=("A", "B")
)
total = df.count()
missing_counts = df.select(
*[(total - fn.count(col)).alias("missing(%s)" % col) for col in df.columns]
)
missing_counts.show()
>>> +----------+----------+
... |missing(A)|missing(B)|
... +----------+----------+
... | 2| 3|
... +----------+----------+