计算Spark中UDF的调用



使用Spark 1.6.1,我想调用UDF被调用的次数。我想这样做是因为我有一个非常昂贵的UDF(每次调用约1秒),并且我怀疑UDF被调用的次数比我的数据帧中的记录数量要多,这使得我的spark作业比必要的慢。

虽然我无法重现这种情况,但我提出了一个简单的示例,显示对UDF的调用次数似乎与行数不同(这里:少),这是怎么回事?

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo extends App {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._

  val callCounter = sc.accumulator(0)
  val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")
  println(df.count) //  gives 10000
  val myudf = udf((d:Int) => {callCounter.add(1);d})
  val res = df.withColumn("result",myudf($"value")).cache
  println(res.select($"result").collect().size) // gives 10000
  println(callCounter.value) // gives 9941
}

如果使用累加器不是调用UDF计数的正确方法,我还能怎么做呢?

注意:在我实际的Spark-Job中,获得的call-count大约是实际记录数的1.7倍。

Spark应用程序应该定义一个main()方法,而不是扩展scala.App。scala的子类。应用程序可能无法正常工作。

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo extends App {
    def main(args: Array[String]): Unit = {
         val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
         val sc = new SparkContext(conf)
         // [...]
    }   
}

相关内容

  • 没有找到相关文章

最新更新