激发 SQL 性能



我的代码算法如下
步骤1.获取一个 hbase 实体数据到 hBaseRDD

      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class); 

Step2. 将 hBaseRDD 转换为 rowPairRDD

     // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD 
                            .mapToPair(***); 
    dataRDD.repartition(500);
        dataRDD.cache();

Step3. 将 rowPairRDD 转换为 schemaRDD

            JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema); 
            schemaRDD.registerTempTable("testentity"); 
           sqlContext.sqlContext().cacheTable("testentity");

步骤4. 使用Spark SQL做第一个简单的SQL查询。

   JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE 
             column3 = 'value1' ") 
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

步骤5. 使用Spark SQL执行第二个简单的SQL查询。

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity 
                                     WHERE column3 = 'value2' ") 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

步骤6. 使用Spark SQL执行第三个简单的SQL查询。

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

测试结果如下:

测试用例1

当我插入 300,000 条记录时,hbase 实体,然后运行代码。

  • 第一个查询需要 60407 ms
  • 第二个查询需要 838 毫秒
  • 3TD 查询需要 792 毫秒

如果我使用 hbase API 进行类似的查询,则只需要 2000 毫秒。 显然,最后 2 个 Spark sql 查询比 hbase api 查询快得多。
我相信第一个 spark sql 查询花费大量时间从 hbase 加载数据。
因此,第一个查询比最后两个查询慢得多。我认为结果在意料之中

测试用例2

当我插入 400,000 条记录时。 HBase 实体,然后运行代码。

  • 第一个查询需要 87213 毫秒
  • 第二个查询需要 83238 ms
  • 3TD 查询需要 82092 毫秒

如果我使用 hbase API 进行类似的查询,则只需要 3500 毫秒。 显然,3 个 Spark sql 查询比 hbase api 查询慢得多。
而且最后 2 个 spark sql 查询也非常慢,性能与第一个查询相似,为什么?如何调整性能?

我怀疑您正在尝试缓存比分配给Spark实例的数据更多的数据。我将尝试分解完全相同查询的每次执行中发生的情况。

首先,Spark 中的所有内容都是懒惰的。 这意味着当你调用rdd.cache()时,实际上什么都不会发生,直到你对RDD做一些事情。

第一个查询

  1. 全HBase扫描(慢速)
  2. 增加分区数(导致随机播放、速度变慢)
  3. 数据实际上缓存到内存中,因为Spark是懒惰的(有点慢)
  4. 应用位置谓词(快速)
  5. 收集结果

第二次/第三次查询

  1. 完全内存扫描(快速)
  2. 应用位置谓词(快速)
  3. 收集结果

现在,Spark将尝试缓存尽可能多的RDD。 如果它无法缓存整个内容,您可能会遇到一些严重的减速。 如果缓存的某个步骤导致随机播放,则尤其如此。对于每个后续查询,您可能会在第一个查询中重复步骤 1 - 3。 这并不理想。

要查看您是否没有完全缓存RDD,请转到Spark Web UI(如果处于本地独立模式http://localhost:4040)并查找RDD存储/持久性信息。 确保它处于 100%。

编辑(每条评论):

我的 hbase 中的 400,000 个数据大小只有大约 250MB。为什么我需要使用2G来解决问题(但是1G>>250MB)

我不能确定为什么你用spark.executor.memory=1G达到最大限制,但我会添加一些关于缓存的更多相关信息。

  • Spark 仅将执行程序堆内存的一定百分比分配给缓存。 默认情况下,此值为 spark.storage.memoryFraction=0.6 或 60%。 所以你真的只会得到1GB * 0.6.
  • HBase 中使用的总空间可能与在 Spark 中缓存时占用的总堆空间不同。 默认情况下,Spark 在内存中存储时不会序列化 Java 对象。 因此,存储 Java Object元数据会产生相当大的开销。 您可以更改默认持久性级别。

您知道如何缓存所有数据以避免第一次查询的性能不佳吗?

调用任何操作都将导致 RDD 被缓存。 就这样做

scala> rdd.cache
scala> rdd.count

现在它已缓存。

我希望

您一次性一个接一个地运行这些查询,如果是,为什么要为每个查询创建单独的sqlContext?您也可以尝试对RDD进行重新分区,这将增加并行性。如果可能的话,还要缓存RDD。

希望上述步骤能够提高性能。

相关内容

  • 没有找到相关文章

最新更新