Map Reduce作业中的NullPointerException



我正在尝试使用java api批量上传到Hbase。当调用Mapper类时,我得到以下异常。这是我在调试驱动程序代码时发现的。当调试器试图命中映射器代码时,会出现此错误。我的Hfile已创建,但无法加载到Hbase

16/08/10 04:09:56 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@7363c839
16/08/10 04:09:56 INFO mapred.MapTask: Processing split: file:/home/cloudera/su.txt:0+50
16/08/10 04:09:56 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/08/10 04:09:56 INFO mapred.MapTask: io.sort.mb = 100
16/08/10 04:09:57 INFO mapred.MapTask: data buffer = 79691776/99614720
16/08/10 04:09:57 INFO mapred.MapTask: record buffer = 262144/327680
16/08/10 04:09:57 INFO mapred.LocalJobRunner: Map task executor complete.
16/08/10 04:09:57 WARN mapred.LocalJobRunner: job_local930363008_0001
java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:843)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/08/10 04:09:57 INFO mapred.JobClient:  map 0% reduce 0%
16/08/10 04:09:57 INFO mapred.JobClient: Job complete: job_local930363008_0001
16/08/10 04:09:57 INFO mapred.JobClient: Counters: 0

这是我做那个操作的代码

package com.sample.bulkload.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HBaseBulkLoad {

    public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split(",");
            String rowKey = values[0];
            // ImmutableBytesWritable HKey = new
            // ImmutableBytesWritable(put.getRow());
            // context.write(HKey, put);
            System.out.println("Entered into Mapper Method");
            Put HPut = new Put(Bytes.toBytes(rowKey));
            HPut.add(Bytes.toBytes("personalDetails"), Bytes.toBytes("first_name"), Bytes.toBytes(values[1]));
            HPut.add(Bytes.toBytes("personalDetails"), Bytes.toBytes("last_name"), Bytes.toBytes(values[2]));
            HPut.add(Bytes.toBytes("contactDetails"), Bytes.toBytes("email"), Bytes.toBytes(values[3]));
            HPut.add(Bytes.toBytes("contactDetails"), Bytes.toBytes("city"), Bytes.toBytes(values[4]));
            context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), HPut);
            System.out.println("Written into Context");
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientport", "2181");
        Job job = new Job(conf, "HBase_Bulk_loader");
        HTable hTable = new HTable(conf, args[2]);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat.class);
        job.setJarByClass(HBaseBulkLoad.class);
        job.setMapperClass(HBaseBulkLoad.BulkLoadMap.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        HFileOutputFormat.configureIncrementalLoad(job, hTable);
        job.waitForCompletion(true);
    }
}

映射器输出键和值类需要从Writable接口扩展

相关内容

  • 没有找到相关文章

最新更新