我有一个批处理作业,计划将大约2.5亿条记录从HBase表加载到Kafka队列。
批处理最初以大约1250行/秒的速度开始扫描或读取。但在读取大约400万到500万条记录后,读取速度会减慢到90行/秒,并永远保持下去。
我尝试了各种方法来刷新每400万条记录的连接,但速度仍然在放缓。
以下是它的配置和逻辑。
private Configuration mHbaseConf;
private int MAX_HBASE_BATCH_SIZE = 1000;
private void hbaseConfCreation() {
this.mHbaseConf = HBaseConfiguration.create();
this.mHbaseConf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000000);
this.mHbaseConf.setLong(
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 3600000);
this.mHbaseConf.set("hbase.zookeeper.quorum", this.properties
.getProperty("ip.hbase.zookeeper.quorum"));
this.mHbaseConf
.set("hbase.zookeeper.property.clientPort",
this.properties
.getProperty("ip.hbase.zookeeper.property.clientPort"));
}
以下是读取和发布逻辑
HTable table = new HTable(this.mHbaseConf, tableName);
Scan s = new Scan();
s.setCaching(this.MAX_HBASE_BATCH_SIZE);
s.setCacheBlocks(false);
ResultScanner rs = table.getScanner(s);
for (Result result : rs) {
//prepare the value
KafkaMsgPublisher.send(value);
}
kafka消息立即发送,但扫描速度正在减慢。我已经用正确的日志验证了这一点,这些日志显示读取一直在消耗。
我有整个工作单线程迭代循环。我试着每4M条记录重新加载一次配置,但也无济于事。
这项工作需要几天或几个月才能完成。我有什么办法可以改进这一点吗。是什么原因导致了这种放缓。
此问题已于上周解决。问题是,我正在进行文件写入操作,以存储Hbase记录的密钥。我这样做是为了在出现异常时存储密钥。文件写入操作不如单个Hbase记录读取快,并且将时间增加了近100倍。
FileWriter fileWriter = new FileWriter(file, true);
BufferedWriter bufferWriter = new BufferedWriter(fileWriter);
bufferWriter.write(rowKey);
bufferWriter.write("n");
bufferWriter.close();
一旦我评论了这个逻辑并将其移动到Catch块中,这个工作就非常快了。大约2亿条记录在12小时内全部处理完毕。