我的 Hbase 表有 3000 万条记录,每条记录都有列raw:sample
,raw 是列族样本是列。此列非常大,大小从几 KB 到 50MB 不等。当我运行以下 Spark 代码时,它只能获得 40,000 条记录,但我应该获得 3000 万条记录:
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "10.1.1.15:2181")
conf.set(TableInputFormat.INPUT_TABLE, "sampleData")
conf.set(TableInputFormat.SCAN_COLUMNS, "raw:sample")
conf.set("hbase.client.keyvalue.maxsize","0")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
var arrRdd:RDD[Map[String,Object]] = hBaseRDD.map(tuple => tuple._2).map(...)
现在,我通过首先获取 id 列表然后迭代 id 列表来解决此问题,以获取由 Spark foreach 中的纯 Hbase java 客户端raw:sample
的列。有什么想法,为什么我不能通过Spark获得所有raw:sample
列,是因为列太大吗?
几天前,我的一个 zookeeper 节点和数据节点关闭了,但由于副本是 3,我很快就修复了它,这是原因吗?会认为如果我运行hbck -repair
会有所帮助,非常感谢!
在内部,TableInputFormat 创建一个 Scan 对象,以便从 HBase 检索数据。
尝试创建一个 Scan 对象(不使用 Spark),配置为从 HBase 检索相同的列,查看错误是否重复:
// Instantiating Configuration class
Configuration config = HBaseConfiguration.create();
// Instantiating HTable class
HTable table = new HTable(config, "emp");
// Instantiating the Scan class
Scan scan = new Scan();
// Scanning the required columns
scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("name"));
scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("city"));
// Getting the scan result
ResultScanner scanner = table.getScanner(scan);
// Reading values from scan result
for (Result result = scanner.next(); result != null; result = scanner.next())
System.out.println("Found row : " + result);
//closing the scanner
scanner.close();
此外,默认情况下,TableInputFormat 配置为从 HBase 服务器请求非常小的数据块(这是错误的,会导致大量开销)。设置以下内容以增加区块大小:
scan.setBlockCache(false);
scan.setCaching(2000);
对于像您这样的高吞吐量,Apache Kafka 是集成数据流和保持数据管道活动的最佳解决方案。有关 kafka 的一些用例,请参阅 http://kafka.apache.org/08/uses.html
再来一个http://sites.computer.org/debull/A12june/pipeline.pdf