我正在尝试执行BulkLoad到Hbase。map reduce的输入是hdfs文件(来自Hive)。使用Tool(Job)类中的以下代码启动批量加载过程HFileOutputFormat。configureIncrementalLoad(job, new HTable(config, TABLE_NAME));
在Mapper中,使用以下内容作为Mapper的输出上下文。write(new ImmutableBytesWritable(Bytes.toBytes(hbaseTable)), put);一旦映射器完成。使用
执行实际的批量装载LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);
作业运行正常,但是一旦Loadincrement启动,它将永远挂起。经过多次尝试,我不得不停止这项工作。然而,经过长时间的等待,可能是30分钟,我终于得到了上述错误。经过广泛的搜索,我发现Hbase会试图访问位于输出文件夹中的文件(HFiles),而该文件夹没有写入或执行的权限。抛出上面的错误。因此,替代的解决方案是在执行批量加载之前在java代码中添加文件访问权限,如下所示。
FileSystem fileSystem = FileSystem.get(config);
fileSystem.setPermission(new Path(outputPath),FsPermission.valueOf("drwxrwxrwx"));
当我们从开发转向生产时,这是正确的方法吗?同样,一旦我添加了上面的代码,在输出文件夹中创建的文件夹也会出现类似的错误。这次是列族文件夹。这是运行时的动态动作。
作为一个临时的解决办法,我按照下面的方法做了,并且能够继续前进。文件系统。setPermission(新路径(outputPath +"/col_fam_folder"),FsPermission.valueOf("drwxrwxrwx")),
这两个步骤似乎都是变通的,我需要一个正确的解决方案来转移到生产中。提前感谢
试试这个系统。setProperty("HADOOP_USER_NAME"、"hadoop");
安全批量加载似乎是一个合适的答案。这个线程解释了一个示例实现。将代码段复制如下:
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
String keyTab = "pathtokeytabfile";
String tableName = "tb_name";
String pathToHFile = "/tmp/tmpfiles/";
Configuration configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum","ZK_QUORUM");
configuration.set("hbase.zookeeper"+ ".property.clientPort","2181");
configuration.set("hbase.master","MASTER:60000");
configuration.set("hadoop.security.authentication", "Kerberos");
configuration.set("hbase.security.authentication", "kerberos");
//Obtaining kerberos authentication
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab("here keytab", path to the key tab);
HBaseAdmin.checkHBaseAvailable(configuration);
System.out.println("HBase is running!");
HBaseConfiguration.addHbaseResources(configuration);
Connection conn = ConnectionFactory.createConnection(configuration);
Table table = conn.getTable(TableName.valueOf(tableName));
HRegionInfo tbInfo = new HRegionInfo(table.getName());
//path to the HFiles that need to be loaded
Path hfofDir = new Path(pathToHFile);
//acquiring user token for authentication
UserProvider up = UserProvider.instantiate(configuration);
FsDelegationToken fsDelegationToken = new FsDelegationToken(up, "name of the key tab user");
fsDelegationToken.acquireDelegationToken(hfofDir.getFileSystem(configuration));
//preparing for the bulk load
SecureBulkLoadClient secureBulkLoadClient = new SecureBulkLoadClient(table);
String bulkToken = secureBulkLoadClient.prepareBulkLoad(table.getName());
System.out.println(bulkToken);
//creating the family list (list of family names and path to the hfile corresponding to the family name)
final List<Pair<byte[], String>> famPaths = new ArrayList<>();
Pair p = new Pair();
//name of the family
p.setFirst("nameofthefamily".getBytes());
//path to the HFile (HFile are organized in folder with the name of the family)
p.setSecond("/tmp/tmpfiles/INTRO/nameofthefilehere");
famPaths.add(p);
//bulk loading ,using the secure bulk load client
secureBulkLoadClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), bulkToken, tbInfo.getStartKey());
System.out.println("Bulk Load Completed..");