Spark只写入一个hbase区域服务器


import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.PairRDDFunctions
def bulkWriteToHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sinkTableName: String, outRDD: RDD[(ImmutableBytesWritable, Put)]): Unit = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sinkTableName)
val hJob = Job.getInstance(hConf)
hJob.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, sinkTableName)
hJob.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) 
outRDD.saveAsNewAPIHadoopDataset(hJob.getConfiguration())
}

通过使用这个hbase批量插入,我发现每次spark都只会从hbase写入一个区域服务器,这就成为了瓶颈。

然而,当我使用几乎相同的方法但从hbase读取时,它使用多个执行器来进行并行读取。

def bulkReadFromHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sourceTableName: String) = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sourceTableName)
val inputRDD = sparkContext.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
inputRDD
}

有人能解释一下为什么会发生这种情况吗?或者我有对spark-hbase批量I/O使用了错误的方式?

问题:我对spark-hbase批量I/O使用了错误的方法?

不,你的方法是正确的,尽管,你需要在手动之前预先分割区域&创建带有预拆分区域的表

例如create 'test_table', 'f1', SPLITS=> ['1', '2', '3', '4', '5', '6', '7', '8', '9']

上表占据了9个区域。。

设计好的rowkey将从1-9 开始

你可以使用番石榴杂音散列如下。

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
/**
* getMurmurHash.
* 
* @param content
* @return HashCode
*/
public static HashCode getMurmurHash(String content) {
final HashFunction hf = Hashing.murmur3_128();
final HashCode hc = hf.newHasher().putString(content, Charsets.UTF_8).hash();
return hc;
}
final long hash = getMurmur128Hash(Bytes.toString(yourrowkey as string)).asLong();
final int prefix = Math.abs((int) hash % 9);

现在将这个前缀附加到您的行密钥

例如

1rowkey1//将进入第一个区域
2rowkey2//将进入第二个区域
3rowkey3//将进入第三个区域。。。9箭头键9//将进入第九区域

如果您正在进行预拆分,并且希望手动管理区域拆分,还可以通过将hbase.hregion.max.filesize设置为一个大数字并将拆分策略设置为ConstantSizeRegionSplitPolicy来禁用区域拆分。但是,您应该使用100GB这样的保护值,这样区域就不会增长到超出区域服务器的能力。您可以考虑禁用自动拆分,并依赖于预拆分的初始区域集。例如,如果您对密钥前缀使用统一哈希,并且您可以确保表中每个区域的读/写负载及其大小是一致的

1) 请确保在将数据加载到hbase表之前可以预拆分表2)使用murmurhash或其他哈希技术设计好的行键,如下所述。以确保各区域的均匀分布
另请参阅http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/

问题:有人能解释一下为什么会发生这种情况吗?

原因非常明显和简单由于该表的行键不好,数据被热点化到一个特定原因。。。

考虑一个java中的hashmap,它包含哈希代码为1234的元素。那么它将把所有元素都放在一个桶里,不是吗?如果hashmap元素分布在不同的好的hashcode上,那么它将把元素放在不同的桶中。hbase也是如此。这里你的hashcode就像你的rowkey

此外

如果我已经有一个表,并且我想分割区域,会发生什么穿过

RegionSplitter类为那些选择手动分割区域而不是由HBase自动处理区域的开发人员提供了几个实用程序,以帮助他们在管理生命周期中进行管理。

最有用的实用程序有:

  • 创建一个具有指定数量的预拆分区域的表
  • 对现有表上的所有区域执行滚动拆分

示例:

$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1

其中-c 10,将请求的区域数指定为10,-f指定表中所需的列族,用":"分隔。该工具将创建一个名为"test_table"的表,其中包含10个区域:

13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY => '', ENDKEY => '19999999', ENCODED => acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY => '19999999', ENDKEY => '33333332', ENCODED => 37ec12df6bd0078f5573565af415c91b,}
...

正如评论中所讨论的,您发现我在写入hbase之前的最后一个RDD只有1个分区!这表明只有一个执行者持有全部数据。。。我还在努力找出原因

此外,检查

spark.default.parallelism默认为所有机器。并行化api没有父RDD来确定分区数,因此它使用spark.default.parallelism

因此您可以通过重新分区来增加分区

注:我观察到,在Mapreduce中,区域的分区数/输入分割数=启动的映射程序数。。类似地,在您的情况下,数据加载到一个特定区域的情况可能相同,这就是一个执行器启动的原因。请核实一下

尽管您没有提供示例数据或足够的解释,但这主要不是由于您的代码或配置造成的。这种情况之所以会发生,是因为行键设计不是最优的。您正在写入的数据的键(hbase-rowkey)结构不正确(可能是单调递增或其他原因)。因此,正在对其中一个区域进行写入。您可以通过各种方式(行键设计的各种推荐做法,如盐析、反转和其他技术)来防止这种情况的发生。如需参考,请参阅http://hbase.apache.org/book.html#rowkey.design

在这种情况下,如果您想知道是对所有区域并行写入还是逐个写入(问题不清楚),请查看以下内容:http://hbase.apache.org/book.html#_bulk_load.

最新更新