Spark SQL数据帧外部数据源效率低



当我试图对Spark SQL外部数据源进行测试时,会出现此问题。

我以两种方式构建数据帧,并比较收集操作的速度。我发现,如果列号太大,那么从外部数据源构建的数据帧就会落后。我想知道这是否是Spark SQL外部数据源的限制。:-)

为了更清楚地提出这个问题,我写了一段代码:

https://github.com/sunheehnus/spark-sql-test/

在我的外部数据源API的基准测试代码中,它实现了一个伪外部数据源(实际上是RDD[String,Array[Int]]),并通过获取数据帧

val cmpdf = sqlContext.load("com.redislabs.test.dataframeRP", Map[String, String]())

然后我构建相同的RDD并通过获取数据帧

val rdd = sqlContext.sparkContext.parallelize(1 to 2048, 3)
val mappedrdd = rdd.map(x =>(x.toString, (x to x + colnum).toSeq.toArray))
val df = mappedrdd.toDF()
val dataColExpr = (0 to colnum).map(_.toString).zipWithIndex.map { case (key, i) => s"_2[$i] AS `$key`" }
val allColsExpr = "_1 AS instant" +: dataColExpr
val df1 = df.selectExpr(allColsExpr: _*)

当我运行测试代码时,我可以在笔记本电脑上看到结果:

9905
21427

但当我减少列(512)时,我可以看到结果:

4323
2221

看起来问题是,如果Schema中的列数较小,那么外部数据源API将受益,但随着Schema中列数的增长,外部数据源API最终将落后。。。。。。我想知道这是Spark-SQL对外部数据源API的限制,还是我以错误的方式使用API?非常感谢。:-)

您没有在这里对想要进行基准测试的内容进行基准测试。这个基准测试简单地得出结论,与内置代码相比,更昂贵的代码版本(即您编写的代码)更昂贵。

你的结果有两个原因:

  1. 当您声明一个字段(常量或普通列)时,Spark SQL实际上会生成相当优化的代码来展开循环。当您自己实现数据源时,您没有展开循环。事实上,您不仅没有展开循环,而且每行都有一些昂贵的操作,如"(x到x+colnum).toSeq.toArray"。所有这些都在Spark SQL生成的代码中消除了。

  2. 如果"needConversion"为false,则Spark SQL在从数据源获取数据时引入入站转换。此入站转换将外部行格式转换为内部行格式(例如,字符串不再是Java字符串,而是UTF8编码的字符串)。

相关内容

  • 没有找到相关文章

最新更新