IOException 在拆分 java.util.concurrent.ExecutionException: jav



我正在尝试使用盐渍表方法将数据批量加载到 hbase 中,如本网站中所述:https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/。虽然我能够插入数据,但在随机时间我得到

错误映射减少。LoadIncrementalHFiles:拆分过程中的IOException java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File not exist:/user//hfile/transactions/transaction_data/b1c6c47856104db0a1289c2b7234d1d7

我正在使用 hbase-client 1.2.0 和 hbase-server 1.2.0 作为依赖项,并使用 HFileOutputFormat2 类来编写我的 HFile。

我尝试使用编写器补丁 HFileOutputFormat2 https://github.com/gbif/maps/commit/ee4e0001486f3e8b37b034c5b05fc8c8d4e76ab9 但这将无法一起写入 HFile。

下面是编写 HFile 并将其加载到 hbase 的代码部分

dates.map { x =>
val hbase_connection2 = ConnectionFactory.createConnection(hbaseconfig)
val table2 = hbase_connection2.getTable(TableName.valueOf(hbase_table))
val regionLoc2 = hbase_connection2.getRegionLocator(table2.getName)
val admin2 = hbase_connection2.getAdmin
val transactionsDF = sql(hive_query_2.replace("|columns|", hive_columns) + " " + period_clause.replace("|date|", date_format.format(x)))
val max_records = transactionsDF.count()
val max_page = math.ceil(max_records.toDouble/page_limit.toDouble).toInt
val start_row = 0
val end_row = page_limit.toInt
val start_page = if(date_format.format(x).equals(bookmark_date)) {
bookmarked_page
}
else{
0
}
val pages = start_page to (if(max_records < page_limit.toInt){max_page-1}else{max_page})
if(max_records > 0) {
pages.map (page => {
val sourceDF = transactionsDF
.withColumn("tanggal", cnvrt_tanggal(transactionsDF.col("ctry_cd"), transactionsDF.col("ori_tanggal"), transactionsDF.col("ori_jam")))
.withColumn("jam", cnvrt_jam(transactionsDF.col("ctry_cd"), transactionsDF.col("ori_tanggal"), transactionsDF.col("ori_jam")))
.join(locations, transactionsDF.col("wsid") === locations.col("key"), "left_outer")
.join(trandescdictionary, lower(transactionsDF.col("source_system")) === lower(trandescdictionary.col("FLAG_TRANS")) && lower(transactionsDF.col("trans_cd")) === lower(trandescdictionary.col("TRAN_CODE")), "left_outer")
.filter(transactionsDF.col("rowid").between((start_row + (page * page_limit.toInt)).toString, ((end_row + (page * page_limit.toInt)) - 1).toString))
.withColumn("uuid", timeUUID())
.withColumn("created_dt", current_timestamp())
val spp = new SaltPrefixPartitioner(hbase_regions)
val saltedRDD = sourceDF.rdd.flatMap(r => {
Seq((salt(r.getString(r.fieldIndex("uuid")), hbase_regions), Seq(r.get(r.fieldIndex(new String(cols(0).toLowerCase))), r.get(r.fieldIndex(new String(cols(1).toLowerCase))), r.get(r.fieldIndex(new String(cols(2).toLowerCase))), r.get(r.fieldIndex(new String(cols(3).toLowerCase))), r.get(r.fieldIndex(new String(cols(4).toLowerCase))), r.get(r.fieldIndex(new String(cols(5).toLowerCase))), r.get(r.fieldIndex(new String(cols(6).toLowerCase))), r.get(r.fieldIndex(new String(cols(7).toLowerCase))), r.get(r.fieldIndex(new String(cols(8).toLowerCase))), r.get(r.fieldIndex(new String(cols(9).toLowerCase))), r.get(r.fieldIndex(new String(cols(10).toLowerCase))), r.get(r.fieldIndex(new String(cols(11).toLowerCase))), r.get(r.fieldIndex(new String(cols(12).toLowerCase))), r.get(r.fieldIndex(new String(cols(13).toLowerCase))), r.get(r.fieldIndex(new String(cols(14).toLowerCase))), r.get(r.fieldIndex(new String(cols(15).toLowerCase))), r.get(r.fieldIndex(new String(cols(16).toLowerCase))), r.get(r.fieldIndex(new String(cols(17).toLowerCase))), r.get(r.fieldIndex(new String(cols(18).toLowerCase))), r.get(r.fieldIndex(new String(cols(19).toLowerCase))), r.get(r.fieldIndex(new String(cols(20).toLowerCase))), r.get(r.fieldIndex(new String(cols(21).toLowerCase))), r.get(r.fieldIndex(new String(cols(22).toLowerCase))))))
})
val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(spp)
val cells = partitionedRDD.flatMap(r => {
val salted_keys = r._1
val colFamily = hbase_colfamily
Seq(
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(0).getBytes(), Bytes.toBytes(Option(r._2(0)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(1).getBytes(), Bytes.toBytes(Option(r._2(1)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(2).getBytes(), Bytes.toBytes(Option(r._2(2)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(3).getBytes(), Bytes.toBytes(Option(r._2(3)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(4).getBytes(), Bytes.toBytes(Option(r._2(4)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(5).getBytes(), Bytes.toBytes(Option(r._2(5)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(6).getBytes(), Bytes.toBytes(Option(r._2(6)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(7).getBytes(), Bytes.toBytes(Option(r._2(7)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(8).getBytes(), Bytes.toBytes(Option(r._2(8)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(9).getBytes(), Bytes.toBytes(Option(r._2(9)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(10).getBytes(), Bytes.toBytes(Option(r._2(10)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(11).getBytes(), Bytes.toBytes(Option(r._2(11)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(12).getBytes(), Bytes.toBytes(Option(r._2(12)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(13).getBytes(), Bytes.toBytes(Option(r._2(13)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(14).getBytes(), Bytes.toBytes(Option(r._2(14)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(15).getBytes(), Bytes.toBytes(Option(r._2(15)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(16).getBytes(), Bytes.toBytes(Option(r._2(16)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(17).getBytes(), Bytes.toBytes(Option(r._2(17)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(18).getBytes(), Bytes.toBytes(Option(r._2(18)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(19).getBytes(), Bytes.toBytes(Option(r._2(19)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(20).getBytes(), Bytes.toBytes(Option(r._2(20)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(21).getBytes(), Bytes.toBytes(Option(r._2(21)).getOrElse("").toString))),
(new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(22).getBytes(), Bytes.toBytes(Option(r._2(22)).getOrElse("").toString)))
)
})
val job = Job.getInstance(hbaseconfig, "Insert Transaction Data Row " + (start_row + (page * page_limit.toInt)).toString + " to " + ((end_row + (page * page_limit.toInt)) - 1).toString + " for " + x.toString)
HFileOutputFormat2.configureIncrementalLoad(job, table2, regionLoc2)
val conf = job.getConfiguration
if (fs.exists(path)) {
fs.delete(path, true)
cells.saveAsNewAPIHadoopFile(
path.toString,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf
)
}
else if (!fs.exists(path)) {
cells.saveAsNewAPIHadoopFile(
path.toString,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf
)
}
val bulk_loader = new LoadIncrementalHFiles(conf)
bulk_loader.doBulkLoad(path, admin2, table2, regionLoc2)
conf.clear()
println("Done For " + x.toString + " pages " + (start_row + (page * page_limit.toInt)).toString + " to " + ((end_row + (page * page_limit.toInt)) - 1).toString)
if (fs.exists(bookmark)) {
fs.delete(bookmark, true)
Seq((date_format.format(x), page.toString)).map(r => (r._1, r._2)).toDF("Date", "Page").write.format("com.databricks.spark.csv").option("delimiter", "|").save(bookmark_path)
}
else {
Seq((date_format.format(x), page.toString)).map(r => (r._1, r._2)).toDF("Date", "Page").write.format("com.databricks.spark.csv").option("delimiter", "|").save(bookmark_path)
}
0
})
}
hbase_connection2.close()
0
}

我真的处于智慧的尽头,因为我无法追踪导致此错误的原因。我希望有人能给我一些关于可能导致此文件拆分错误的原因的想法。

我想你可能会看到这个: https://issues.apache.org/jira/projects/HBASE/issues/HBASE-21183

这是我偶尔看到的东西,所以我们从未解决过它。请问你多久看一次?

HBASE-3871似乎必须对数据进行解析和重新分区是解决方案。

请参阅此代码,它是错误的组织

private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
AsyncClusterConnection conn, TableName tableName, ExecutorService pool,
Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this
// phase because of the puts that happen in futures.
Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
Set<String> missingHFiles = new HashSet<>();
Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
new Pair<>(regionGroups, missingHFiles);
// drain LQIs and figure out bulk load groups
Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();
final Callable<Pair<List<LoadQueueItem>, String>> call =
new Callable<Pair<List<LoadQueueItem>, String>>() {
@Override
public Pair<List<LoadQueueItem>, String> call() throws Exception {
Pair<List<LoadQueueItem>, String> splits =
groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
return splits;
}
};
splittingFutures.add(pool.submit(call));
}
// get all the results. All grouping and splitting must finish before
// we can attempt the atomic loads.
for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
try {
Pair<List<LoadQueueItem>, String> splits = lqis.get();
if (splits != null) {
if (splits.getFirst() != null) {
queue.addAll(splits.getFirst());
} else {
missingHFiles.add(splits.getSecond());
}
}
} catch (ExecutionException e1) {
Throwable t = e1.getCause();
if (t instanceof IOException) {
LOG.error("IOException during splitting", e1);
throw (IOException) t; // would have been thrown if not parallelized,
}
LOG.error("Unexpected execution exception during splitting", e1);
throw new IllegalStateException(t);
} catch (InterruptedException e1) {
LOG.error("Unexpected interrupted exception during splitting", e1);
throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
}
}
return pair;
}

最新更新