最新版本的Hadoop已经很容易支持使用FileInputFormat.setInputDirRecursive
嵌套输入目录,这依赖于mapreduce.input.fileinputformat.input.dir.recursive
配置键。
也可以使用 MultipleInputs.addInputPath
指定多个映射器/输入目录组合。
但是我可以同时做这两件事吗?换句话说,有没有办法指定多个映射器/输入目录组合,其中输入目录以递归方式包含?
一个具体的例子:我有以下目录结构:
-
/dataset1/subdir1/data1.txt
-
/dataset2/subdir2/data2.txt
我尝试了这样的事情:
Job job = Job.getInstance(conf);
FileInputFormat.setInputDirRecursive(job, true);
MultipleInputs.addInputPath(job, new Path("/dataset1"), TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job, new Path("/dataset2"), TextInputFormat.class,
Mapper2.class);
...
job.waitForCompletion(true);
但后来我得到了一个例外
,就像Error: java.io.IOException: 's3://bucketname/dataset1/subdir1' is a directory
这在 Hadoop 2.4.0 下的 Amazon EMR 中运行。
编辑:Hadoop版本是2.4.0,而不是2.6.0
好吧,不确定 s3,但这是正常的。应指向文件而不是目录。
试试这个。
方法 1
final static public void addInputPathRecursively(FileSystem fs, Path path, PathFilter inputFilter, Job job,boolean swithc) throws IOException
{
for (FileStatus stat : fs.listStatus(path, inputFilter))
{
if (stat.isDirectory())
{
addInputPathRecursively(fs, stat.getPath(), inputFilter, job);
} else
{
if (swithc)
{
MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper1.class);
} else
MultipleInputs.addInputPath(job, new Path(stat.getPath().toString()), TextInputFormat.class, Mapper2.class);
}
}
}
在驱动程序类中,可以相应地调用它。
addInputPathRecursively(fs, datset1path, new FileFilter(conf, fs,
new String[] { txt }), job,true);
addInputPathRecursively(fs, datset2path, new FileFilter(conf, fs,
new String[] { txt }), job,false);
这是一个示例,但如果要应用 regEx,则可以正确控制路径过滤器。
方法 2设置这个也应该有魔力。
FileInputFormat.setInputDirRecursive(job, true(;
方法 3绕过映射器内部并在行级别进行处理。(不是一个好主意!