如何在Hadoop Map/Reduce作业中访问分布式缓存



我正在尝试使用 GenericOptionsParser-files 标志将一个小文件传递给我正在运行的作业:

$ hadoop jar MyJob.jar -conf /path/to/cluster-conf.xml -files /path/to/local-file.csv data/input data/output

这应该将作业发送到我的集群并附加本地文件.csv以便在需要时可用于映射器/化简器。当我在伪分布式模式下运行它时,它工作得很好,但是当我在集群上启动作业时,似乎找不到该文件。我正在映射器的setup方法中读取文件,如下所示:

public static class TheMapper extends Mapper<LongWritable, Text, Text, Text> {
  @Override
  public void setup(Context context) throws IOException, InterruptedException {
    URI[] uriList = DistributedCache.getCacheFiles( context.getConfiguration() );
    CsvReader csv = new CsvReader(uriList[0].getPath());
    // work with csv file..
  }
  // ..
}

当作业正在运行时,我收到以下异常:

java.io.FileNotFoundException: File /hdfs/tmp/mapred/staging/hduser/.staging/job_201205112311_011/files/local-file.csv does not exist.
at com.csvreader.CsvReader.<init>(Unknown Source)
at com.csvreader.CsvReader.<init>(Unknown Source)
at com.csvreader.CsvReader.<init>(Unknown Source)
at MyJob$TheMapper.setup(MyJob.java:167)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
...

知道我做错了什么吗?谢谢。

这是一个常见问题 - -files 选项可用作分布式缓存之外的选项。

使用 -files 时,GenericOptionsParser 配置一个名为 tmpfiles 的作业属性,而分布式缓存使用名为 mapred.cache.files 的属性。

此外,分布式缓存期望文件已经在 HDFS 中,并将它们复制到任务节点,其中 -files 在作业提交时复制到 HDFS 的文件,然后将它们复制到每个任务节点。

在您的情况下,要使您的代码正常工作,只需创建一个File对象并命名您传入的文件(显然这需要您知道本地文件的文件名,并将其硬编码到映射器代码中)。该文件将位于当前工作目录中:

@Override
public void setup(Context context) throws IOException, InterruptedException {
  CsvReader csv = new CsvReader(new File("local-file.csv"));
  // work with csv file..

}

相关内容

  • 没有找到相关文章

最新更新