Hadoop:具有自定义输入格式的 NullPointerException



我已经为Hadoop开发了一个自定义InputFormat(包括自定义InputSplit和自定义RecordReader),我遇到了一个罕见的NullPointerException

这些类将用于查询第三方系统,该系统公开用于检索记录的 REST API。因此,我在DBInputFormat中获得了灵感,这也是一个非HDFS InputFormat

我得到的错误如下:

Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:762)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

我已经搜索了MapTask(Hadoop的2.1.0版本)的代码,我看到有问题的部分是RecordReader的初始化:

472 NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
473       org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
474       TaskReporter reporter,
475       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
476       throws InterruptedException, IOException {
...
491    this.real = inputFormat.createRecordReader(split, taskContext);
...
494 }
...
519 @Override
520 public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
521       org.apache.hadoop.mapreduce.TaskAttemptContext context
522       ) throws IOException, InterruptedException {
523    long bytesInPrev = getInputBytes(fsStats);
524    real.initialize(split, context);
525    long bytesInCurr = getInputBytes(fsStats);
526    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
527 }

当然,我的代码的相关部分:

# MyInputFormat.java
public static void setEnvironmnet(Job job, String host, String port, boolean ssl, String APIKey) {
    backend = new Backend(host, port, ssl, APIKey);
}
public static void addResId(Job job, String resId) {
    Configuration conf = job.getConfiguration();
    String inputs = conf.get(INPUT_RES_IDS, "");
    if (inputs.isEmpty()) {
        inputs += restId;
    } else {
        inputs += "," + resId;
    }
    conf.set(INPUT_RES_IDS, inputs);
}
@Override
public List<InputSplit> getSplits(JobContext job) {
    // resulting splits container
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // get the Job configuration
    Configuration conf = job.getConfiguration();
    // get the inputs, i.e. the list of resource IDs
    String input = conf.get(INPUT_RES_IDS, "");
    String[] resIDs = StringUtils.split(input);
    // iterate on the resIDs
    for (String resID: resIDs) {
       splits.addAll(getSplitsResId(resID, job.getConfiguration()));
    }
    // return the splits
    return splits;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    if (backend == null) {
        logger.info("Unable to create a MyRecordReader, it seems the environment was not properly set");
        return null;
    }
    // create a record reader
    return new MyRecordReader(backend, split, context);
}
# MyRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    // get start, end and current positions
    MyInputSplit inputSplit = (MyInputSplit) this.split;
    start = inputSplit.getFirstRecordIndex();
    end = start + inputSplit.getLength();
    current = 0;
    // query the third-party system for the related resource, seeking to the start of the split
    records = backend.getRecords(inputSplit.getResId(), start, end);
}
# MapReduceTest.java
public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new MapReduceTest(), args);
    System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
    Configuration conf = this.getConf();
    Job job = Job.getInstance(conf, "MapReduce test");
    job.setJarByClass(MapReduceTest.class);
    job.setMapperClass(MyMap.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(MyInputFormat.class);
    MyInputFormat.addInput(job, "ca73a799-9c71-4618-806e-7bd0ca1911f4");
    InputFormat.setEnvironmnet(job, "my.host.com", "443", true, "my_api_key");
    FileOutputFormat.setOutputPath(job, new Path(args[0]));
    return job.waitForCompletion(true) ? 0 : 1;
}

有什么想法是错误的吗?

顺便说一句,RecordReader必须使用的"好"InputSplit哪个是给构造函数的还是在initialize方法中给出的?无论如何,我已经尝试了这两个选项,结果错误是相同的:)

我读取您的跟踪跟踪real的方式在第 524 行为空。

但不要相信我的话。 把assertsystem.out.println塞进去,自己检查real的价值。

NullPointerException几乎总是意味着你点掉了一些你没想到是空的东西。 一些图书馆和馆藏会把它扔给你,作为他们说"这不能为空"的方式。

Error: java.lang.NullPointerException at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:524)

对我来说,这读作:在org.apache.hadoop.mapred包中,MapTask类有一个内部类NewTrackingRecordReader,其中包含一个initialize方法,该方法在第 524 行抛出了一个NullPointerException

524 real.initialize( blah, blah) // I actually stopped reading after the dot

this.real设置在491行。

491 this.real = inputFormat.createRecordReader(split, taskContext);

假设您没有遗漏任何范围更紧密的掩盖this.real real那么我们需要看看inputFormat.createRecordReader(split, taskContext);如果这可以返回null那么它可能是罪魁祸首。

事实证明,当backend为 null 时,它将返回null

@Override
public RecordReader<LongWritable, Text> createRecordReader(
    InputSplit split, 
    TaskAttemptContext context) {
    if (backend == null) {
        logger.info("Unable to create a MyRecordReader, " + 
                    "it seems the environment was not properly set");
        return null;
    }
    // create a record reader
    return new MyRecordReader(backend, split, context);
}

看起来setEnvironmnet应该设置backend

# MyInputFormat.java
public static void setEnvironmnet(
    Job job, 
    String host, 
    String port, 
    boolean ssl, 
    String APIKey) {
    backend = new Backend(host, port, ssl, APIKey);
}

backend必须在setEnvironment之外的某个位置声明(否则会收到编译器错误)。

如果backend在构造时未设置为非 null 并且之前未调用setEnvironmnet createRecordReader那么您应该期望得到您得到的NullPointerException

更新:

正如你所指出的,由于setEnvironmnet()是静态的backend也必须是静态的。 这意味着您必须确保其他实例未将其设置为 null。

已解决。问题是backend变量被声明为static,即它属于java类,因此任何其他改变该变量的对象(例如null)都会影响同一类的所有其他对象。

现在,setEnvironment添加主机、端口、SSL 用法和 API 密钥作为配置(与setResId已经对资源 ID 所做的相同);当调用createRecordReader时,将获取此配置并创建backend对象。

感谢蜜饯橙子让我走上了正确的道路!

相关内容

  • 没有找到相关文章

最新更新