spark如何计算数据框中的记录数?



我知道df.count()将触发spark动作并返回数据框中存在的记录数量,但我想知道这个过程如何在内部工作,spark是否通过整个数据框来计数记录数量,或者是否有任何其他优化技术,如在数据框的元数据中存储值?

我使用的是pyspark 3.2.1.

看起来运行df.count()的底层实际上使用Count聚合类。我是基于在Dataset.scala中count方法的定义。

/**
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}

是否有其他的优化技术,比如将值存储在元数据是dataframe ?

它将采用与Catalyst使用的所有相同的优化策略。它创建表达式的有向图,求值并将它们卷起来。它没有将计数值存储为元数据,这将违反Spark的惰性求值原则。

我做了一个实验,验证了df.count()df.groupBy().count()产生相同的物理计划。

df = spark.createDataFrame(pd.DataFrame({"a": [1,2,3], "b": ["a", "b", "c"]}))
# count using the Dataframe method
df.count()
# count using the Count aggregator
cnt_agg = df.groupBy().count()

两个作业生成相同的Physical Plan:

== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Final Plan ==
* HashAggregate (6)
+- ShuffleQueryStage (5), Statistics(sizeInBytes=64.0 B, rowCount=4, isRuntime=true)
+- Exchange (4)
+- * HashAggregate (3)
+- * Project (2)
+- * Scan ExistingRDD (1)
+- == Initial Plan ==
HashAggregate (8)
+- Exchange (7)
+- HashAggregate (3)
+- Project (2)
+- Scan ExistingRDD (1)

最新更新