Hadoop MultipleInputs.addInputPath是否可以递归工作



最新版本的Hadoop已经很容易支持使用FileInputFormat.setInputDirRecursive嵌套输入目录,这依赖于mapreduce.input.fileinputformat.input.dir.recursive配置键。

也可以使用 MultipleInputs.addInputPath 指定多个映射器/输入目录组合。

但是我可以同时做这两件事吗?换句话说,有没有办法指定多个映射器/输入目录组合,其中输入目录以递归方式包含?

一个具体的例子:我有以下目录结构:

  • /dataset1/subdir1/data1.txt
  • /dataset2/subdir2/data2.txt

我尝试了这样的事情:

Job job = Job.getInstance(conf);
FileInputFormat.setInputDirRecursive(job, true);
MultipleInputs.addInputPath(job, new Path("/dataset1"), TextInputFormat.class,
    Mapper1.class);
MultipleInputs.addInputPath(job, new Path("/dataset2"), TextInputFormat.class,
    Mapper2.class);
...
job.waitForCompletion(true);

但后来我得到了一个例外

,就像Error: java.io.IOException: 's3://bucketname/dataset1/subdir1' is a directory

这在 Hadoop 2.4.0 下的 Amazon EMR 中运行。

编辑:Hadoop版本是2.4.0,而不是2.6.0

好吧,不确定 s3,但这是正常的。应指向文件而不是目录。

试试这个。

方法 1

final static public void addInputPathRecursively(FileSystem fs, Path path, PathFilter inputFilter, Job job,boolean swithc) throws IOException
        {
            for (FileStatus stat : fs.listStatus(path, inputFilter))
            {
                if (stat.isDirectory())
                {
                    addInputPathRecursively(fs, stat.getPath(), inputFilter, job);
                } else
                {
                    if (swithc)
                    {
                        MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper1.class);
                    } else
                        MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper2.class);
                }
            }
        }

在驱动程序类中,可以相应地调用它。

addInputPathRecursively(fs, datset1path, new FileFilter(conf, fs,
                        new String[] { txt }), job,true);
addInputPathRecursively(fs, datset2path, new FileFilter(conf, fs,
                        new String[] { txt }), job,false);

这是一个示例,但如果要应用 regEx,则可以正确控制路径过滤器。

  • 方法 2设置这个也应该有魔力。

    FileInputFormat.setInputDirRecursive(job, true(;

  • 方法 3绕过映射器内部并在行级别进行处理。(不是一个好主意!

最新更新