我有一些方法与DataFrame交互并返回我需要的数字。
private def getExpectedPartitionBytes(df: DataFrame, partitionNames: Seq[String] = Seq())
(implicit spark: SparkSession): Long = {
val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count
val expectedTotalBytes = df.queryExecution.optimizedPlan.stats(spark.sessionState.conf)
.sizeInBytes.toLong
val expectedPartitionBytes = expectedTotalBytes / partitionsCount
// dirty estimation from dataframe dimension
// real size of types divided on two
val maxExpectedPartitionBytes = df.dtypes.filter(t => !partitionNames.contains(t._1)).map(_._2).map {
case "StringType" => 10
case "ByteType" => 1
case "ShortType" => 2
case "IntegerType" => 4
case "LongType" => 8
case "FloatType" => 4
case "DoubleType" => 8
case "TimestampType" => 6
case _ => 2
}.sum * df.count / partitionsCount
if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
// if plan estimation is exists and real
expectedPartitionBytes
} else {
maxExpectedPartitionBytes
}
}
我对这个方法的性能有一个疑问。我们都知道DataFrame是惰性工作的。首先,Spark注册需要在DataFrame上执行的计算。然后,当我们请求计算结果时,Spark开始工作。在这方面,我想问你。请解释一下在我的例子中,DataFrame计算是在什么时候开始的。
在我们给变量赋值的那一刻?
val partitionsCount = df.select(partitionNames.map(c => col(c)): _*).dropDuplicates.count
或者当方法试图返回一个数字给我们的时候?
if (expectedTotalBytes > 0 && expectedPartitionBytes <= maxExpectedPartitionBytes) {
// if plan estimation is exists and real
expectedPartitionBytes
} else {
maxExpectedPartitionBytes
}
或者计数将在除法时开始?
}.sum * df.count / partitionsCount
我也想知道优化我的方法的可能性。我是否可以合理地为我的DataFrame使用checkpoint()
或cache()
来减少不必要的计算?我该怎么做呢?
请解释一下在我的例子中,DataFrame计算是在什么时候开始的。
立即partitionCount
的价值评估,作为count
调用是一个"行动"。如果你想延迟计算,你可以在Scala中声明一个lazy变量:
lazy val partitionCount = df.count
partitionCount: Long = <lazy>
您将看到类型如预期的那样是Long
,但是没有关联的值。一旦你第一次访问val
,表达式将被求值。
我可以合理地使用检查点()或缓存()为我的DataFrame减少不必要的计算吗?
查看您提供的代码,您没有承担任何重复的工作。