我的代码算法如下
步骤1.获取一个 hbase 实体数据到 hBaseRDD
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
Step2. 将 hBaseRDD 转换为 rowPairRDD
// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
.mapToPair(***);
dataRDD.repartition(500);
dataRDD.cache();
Step3. 将 rowPairRDD 转换为 schemaRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema);
schemaRDD.registerTempTable("testentity");
sqlContext.sqlContext().cacheTable("testentity");
步骤4. 使用Spark SQL做第一个简单的SQL查询。
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
column3 = 'value1' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
步骤5. 使用Spark SQL执行第二个简单的SQL查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
步骤6. 使用Spark SQL执行第三个简单的SQL查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
测试结果如下:
测试用例1:
当我插入 300,000 条记录时,hbase 实体,然后运行代码。
- 第一个查询需要 60407 ms
- 第二个查询需要 838 毫秒
- 3TD 查询需要 792 毫秒
如果我使用 hbase API 进行类似的查询,则只需要 2000 毫秒。 显然,最后 2 个 Spark sql 查询比 hbase api 查询快得多。
我相信第一个 spark sql 查询花费大量时间从 hbase 加载数据。
因此,第一个查询比最后两个查询慢得多。我认为结果在意料之中
测试用例2:
当我插入 400,000 条记录时。 HBase 实体,然后运行代码。
- 第一个查询需要 87213 毫秒
- 第二个查询需要 83238 ms
- 3TD 查询需要 82092 毫秒
如果我使用 hbase API 进行类似的查询,则只需要 3500 毫秒。 显然,3 个 Spark sql 查询比 hbase api 查询慢得多。
而且最后 2 个 spark sql 查询也非常慢,性能与第一个查询相似,为什么?如何调整性能?
我怀疑您正在尝试缓存比分配给Spark实例的数据更多的数据。我将尝试分解完全相同查询的每次执行中发生的情况。
首先,Spark 中的所有内容都是懒惰的。 这意味着当你调用rdd.cache()
时,实际上什么都不会发生,直到你对RDD做一些事情。
第一个查询
- 全HBase扫描(慢速)
- 增加分区数(导致随机播放、速度变慢)
- 数据实际上缓存到内存中,因为Spark是懒惰的(有点慢)
- 应用位置谓词(快速)
- 收集结果
第二次/第三次查询
- 完全内存扫描(快速)
- 应用位置谓词(快速)
- 收集结果
现在,Spark将尝试缓存尽可能多的RDD。 如果它无法缓存整个内容,您可能会遇到一些严重的减速。 如果缓存前的某个步骤导致随机播放,则尤其如此。对于每个后续查询,您可能会在第一个查询中重复步骤 1 - 3。 这并不理想。
要查看您是否没有完全缓存RDD,请转到Spark Web UI(如果处于本地独立模式http://localhost:4040
)并查找RDD存储/持久性信息。 确保它处于 100%。
编辑(每条评论):
我的 hbase 中的 400,000 个数据大小只有大约 250MB。为什么我需要使用2G来解决问题(但是1G>>250MB)
我不能确定为什么你用spark.executor.memory=1G
达到最大限制,但我会添加一些关于缓存的更多相关信息。
- Spark 仅将执行程序堆内存的一定百分比分配给缓存。 默认情况下,此值为
spark.storage.memoryFraction=0.6
或 60%。 所以你真的只会得到1GB * 0.6
. - HBase 中使用的总空间可能与在 Spark 中缓存时占用的总堆空间不同。 默认情况下,Spark 在内存中存储时不会序列化 Java 对象。 因此,存储 Java
Object
元数据会产生相当大的开销。 您可以更改默认持久性级别。
您知道如何缓存所有数据以避免第一次查询的性能不佳吗?
调用任何操作都将导致 RDD 被缓存。 就这样做
scala> rdd.cache
scala> rdd.count
现在它已缓存。
您一次性一个接一个地运行这些查询,如果是,为什么要为每个查询创建单独的sqlContext?您也可以尝试对RDD进行重新分区,这将增加并行性。如果可能的话,还要缓存RDD。
希望上述步骤能够提高性能。