处理DataFrame的方法的求值



我有一些方法与DataFrame交互并返回我需要的数字。

private def getExpectedPartitionBytes(df: DataFrame, partitionNames: Seq[String] = Seq())
(implicit spark: SparkSession): Long = {

val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count

val expectedTotalBytes = df.queryExecution.optimizedPlan.stats(spark.sessionState.conf)
.sizeInBytes.toLong

val expectedPartitionBytes = expectedTotalBytes / partitionsCount
// dirty estimation from dataframe dimension
// real size of types divided on two
val maxExpectedPartitionBytes = df.dtypes.filter(t => !partitionNames.contains(t._1)).map(_._2).map {
case "StringType" => 10
case "ByteType" => 1
case "ShortType" => 2
case "IntegerType" => 4
case "LongType" => 8
case "FloatType" => 4
case "DoubleType" => 8
case "TimestampType" => 6
case _ => 2
}.sum * df.count / partitionsCount


if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
// if plan estimation is exists and real
expectedPartitionBytes
} else {
maxExpectedPartitionBytes
}
}

我对这个方法的性能有一个疑问。我们都知道DataFrame是惰性工作的。首先,Spark注册需要在DataFrame上执行的计算。然后,当我们请求计算结果时,Spark开始工作。在这方面,我想问你。请解释一下在我的例子中,DataFrame计算是在什么时候开始的。

在我们给变量赋值的那一刻?

val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count

或者当方法试图返回一个数字给我们的时候?

if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
// if plan estimation is exists and real
expectedPartitionBytes
} else {
maxExpectedPartitionBytes
}

或者计数将在除法时开始?

}.sum * df.count / partitionsCount

我也想知道优化我的方法的可能性。我是否可以合理地为我的DataFrame使用checkpoint()cache()来减少不必要的计算?我该怎么做呢?

请解释一下在我的例子中,DataFrame计算是在什么时候开始的。

立即partitionCount的价值评估,作为count调用是一个"行动"。如果你想延迟计算,你可以在Scala中声明一个lazy变量:

lazy val partitionCount = df.count 
partitionCount: Long = <lazy>

您将看到类型如预期的那样是Long,但是没有关联的值。一旦你第一次访问val,表达式将被求值。

我可以合理地使用检查点()或缓存()为我的DataFrame减少不必要的计算吗?

查看您提供的代码,您没有承担任何重复的工作。

最新更新