我正在尝试使用java api批量上传到Hbase。当调用Mapper类时,我得到以下异常。这是我在调试驱动程序代码时发现的。当调试器试图命中映射器代码时,会出现此错误。我的Hfile已创建,但无法加载到Hbase
16/08/10 04:09:56 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@7363c839
16/08/10 04:09:56 INFO mapred.MapTask: Processing split: file:/home/cloudera/su.txt:0+50
16/08/10 04:09:56 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/08/10 04:09:56 INFO mapred.MapTask: io.sort.mb = 100
16/08/10 04:09:57 INFO mapred.MapTask: data buffer = 79691776/99614720
16/08/10 04:09:57 INFO mapred.MapTask: record buffer = 262144/327680
16/08/10 04:09:57 INFO mapred.LocalJobRunner: Map task executor complete.
16/08/10 04:09:57 WARN mapred.LocalJobRunner: job_local930363008_0001
java.lang.Exception: java.lang.NullPointerException
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:843)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/08/10 04:09:57 INFO mapred.JobClient: map 0% reduce 0%
16/08/10 04:09:57 INFO mapred.JobClient: Job complete: job_local930363008_0001
16/08/10 04:09:57 INFO mapred.JobClient: Counters: 0
这是我做那个操作的代码
package com.sample.bulkload.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HBaseBulkLoad {
public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(",");
String rowKey = values[0];
// ImmutableBytesWritable HKey = new
// ImmutableBytesWritable(put.getRow());
// context.write(HKey, put);
System.out.println("Entered into Mapper Method");
Put HPut = new Put(Bytes.toBytes(rowKey));
HPut.add(Bytes.toBytes("personalDetails"), Bytes.toBytes("first_name"), Bytes.toBytes(values[1]));
HPut.add(Bytes.toBytes("personalDetails"), Bytes.toBytes("last_name"), Bytes.toBytes(values[2]));
HPut.add(Bytes.toBytes("contactDetails"), Bytes.toBytes("email"), Bytes.toBytes(values[3]));
HPut.add(Bytes.toBytes("contactDetails"), Bytes.toBytes("city"), Bytes.toBytes(values[4]));
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), HPut);
System.out.println("Written into Context");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientport", "2181");
Job job = new Job(conf, "HBase_Bulk_loader");
HTable hTable = new HTable(conf, args[2]);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
job.setJarByClass(HBaseBulkLoad.class);
job.setMapperClass(HBaseBulkLoad.BulkLoadMap.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
HFileOutputFormat.configureIncrementalLoad(job, hTable);
job.waitForCompletion(true);
}
}
映射器输出键和值类需要从Writable接口扩展