如何加速Spark SQL单元测试



我正在评估Spark SQL,以实现一个简单的报告模块(对已经存储在HDFS上的Avro数据进行一些简单的聚合)。我毫不怀疑Spark SQL可以很好地满足我的功能性和非功能性需求。

然而,除了生产需求之外,我还想确保该模块是可测试的。我们遵循BDD方法,使用非常集中的场景,这意味着该模块将需要在一些非常简单的数据(1..10条记录)上运行数十个/数百个SQL查询。

为了大致了解Spark SQL在本地模式下的性能,我快速创建了一些测试的原型:

  1. select count(*) from myTable
  2. select key, count(*) from myTable group by key

第一次测试平均需要100毫秒,但第二次测试需要500毫秒。这样的性能是不可接受的——这会使测试套件太慢。

为了进行比较,我可以使用Crunch及其MemPipeline在10ms内运行相同的测试(在本地模式下使用MRPipeline的测试时间为1500ms),在嵌入式模式下使用Hive的测试时间也为1500ms。因此,Spark SQL在本地模式下比MR快一点,但在构建好的测试套件方面仍然有点慢。

是否可以在本地模式下加速Spark SQL?

有没有更好/更快的方法来测试Spark SQL模块?

(我还没有分析执行情况,但由于RDD上的groupBy().countByKey()平均需要40ms,我预计会发现罪魁祸首是查询优化器)


我的快速&脏测试代码如下:

  SparkConf sparkConf = new SparkConf()
                .setMaster("local[4]")
                .setAppName("poc-sparksql");
  try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
        SQLContext sqlCtx = new SQLContext(ctx);
        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();
            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select count(*) from myTable");
            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }
        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();
            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");
            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }
 }

当数据量非常小时,启动太多任务不是一个好选择
在您的第二个选项中,group by将使用200 tasks创建另一个stage,因为您没有设置shuffle partitions属性,默认情况下它是200,并且大多数都是空的。

它可能不会在单个测试中产生影响,但当您有数千个带有shuffle操作的测试时,它可能会产生重大影响。

在spark conf中将"spark.sql.shuffle.partitions"设置为x (where x is local[x])。

实际上,您不需要4 executors来处理少于10条记录,因此可以更好地将执行器的数量减少到1,并将shuffle.paritions设置为1

如果您正在研究ms级别的优化,则有各种指针。

  1. 只读取一次数据,然后缓存多次SQL查询。循环内加载意味着"在每个迭代中生成新任务"
 DataFrame df = sqlCtx.load("/tmp/test.avro","com.databricks.spark.avro");
 df.registerTempTable("myTable");  
 df.cache()
 for (int i = 0; i < ITERATIONS; i++) {
       Stopwatch testCaseSw = new Stopwatch().start();
       DataFrame result = sqlCtx.sql("select count(*) from myTable");
       // Dont do printLn inside the loop , save the output in some hashMap and print it later once the loop is complete
       System.out.println("Results: " + result.collectAsList());
       System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
  1. 在循环外提取System.out.println,因为它需要一些时间

请看一下:http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/

我使用Holden Karau开发的spark-testing-base库在Spark中进行单元测试。

在相关的README.md中,您可以找到有关调整资源以分配给单元测试的更多信息。

相关内容

  • 没有找到相关文章

最新更新