使用MapReduce中的globStatus过滤输入文件



我有很多输入文件,我想根据最后附加的日期处理选定的文件。我现在很困惑,我在哪里使用globStatus方法来过滤掉文件。

我有一个自定义RecordReader类,我试图在其下一个方法中使用globStatus,但它没有成功。

public boolean next(Text key, Text value) throws IOException {
    Path filePath = fileSplit.getPath();
    if (!processed) {
        key.set(filePath.getName());
        byte[] contents = new byte[(int) fileSplit.getLength()];
        value.clear();
        FileSystem fs = filePath.getFileSystem(conf);
        fs.globStatus(new Path("/*" + date));
        FSDataInputStream in = null;
        try {
            in = fs.open(filePath);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents, 0, contents.length);
        } finally {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }
    return false;
}

我知道它返回一个FileStatus数组,但是我如何使用它来过滤文件呢?有人能告诉我吗?

globStatus方法有2个附加参数,允许您过滤文件。第一个是glob模式,但有时glob模式不够强大,无法过滤特定的文件,在这种情况下,可以定义一个PathFilter

对于glob模式,支持以下操作:

Glob   | Matches
-------------------------------------------------------------------------------------------------------------------
*      | Matches zero or more characters
?      | Matches a single character
[ab]   | Matches a single character in the set {a, b}
[^ab]  | Matches a single character not in the set {a, b}
[a-b]  | Matches a single character in the range [a, b] where a is lexicographically less than or equal to b
[^a-b] | Matches a single character not in the range [a, b] where a is lexicographically less than or equal to b
{a,b}  | Matches either expression a or b
c     | Matches character c when it is a metacharacter

PathFilter就是这样一个简单的接口:

public interface PathFilter {
    boolean accept(Path path);
}

所以你可以实现这个接口,并实现accept方法,你可以把你的逻辑过滤文件。

摘自Tom White的一本好书中的一个例子,它允许你定义一个PathFilter来过滤匹配某个正则表达式的文件:

public class RegexExcludePathFilter implements PathFilter {
    private final String regex;
    public RegexExcludePathFilter(String regex) {
        this.regex = regex;
    }
    public boolean accept(Path path) {
        return !path.toString().matches(regex);
    }
}

您可以在初始化作业时通过调用FileInputFormat.setInputPathFilter(JobConf, RegexExcludePathFilter.class)来直接使用PathFilter实现过滤输入。

EDIT:由于必须在setInputPathFilter中传递类,因此不能直接传递参数,但应该能够通过使用Configuration来做类似的事情。如果你让你的RegexExcludePathFilter也从Configured扩展,你可以得到一个Configuration对象,你之前已经初始化了所需的值,所以你可以在你的过滤器中得到这些值,并在accept中处理它们。

例如:

conf.set("date", "2013-01-15");
然后你可以这样定义你的过滤器:
public class RegexIncludePathFilter extends Configured implements PathFilter {
    private String date;
    private FileSystem fs;
    public boolean accept(Path path) {
        try {
            if (fs.isDirectory(path)) {
                return true;
            }
        } catch (IOException e) {}
        return path.toString().endsWith(date);
    }
    public void setConf(Configuration conf) {
        if (null != conf) {
            this.date = conf.get("date");
            try {
                this.fs = FileSystem.get(conf);
            } catch (IOException e) {}
        }
    }
}

EDIT 2:原始代码有一些问题,请参阅更新后的类。您还需要删除构造函数,因为它不再被使用,并检查它是否是一个目录,在这种情况下,您应该返回true,以便目录的内容也可以被过滤。

对于任何读到这篇文章的人,我可以说"请不要在过滤器中做任何比验证路径更复杂的事情"。具体来说:不要检查文件是否为目录,获取其大小等。等待,直到list/glob操作返回,然后使用现在填充的FileStatus条目中的信息在那里进行过滤。

为什么?所有对getFileStatus()的调用,无论是直接调用还是通过isDirectory()调用,都是对文件系统进行不必要的调用,这些调用会在HDFS集群上增加不必要的namenode负载。更关键的是,针对S3和其他对象存储,每个操作都可能发出多个HTTPS请求——这些请求确实需要可测量的时间。更好的是,如果S3认为您在整个机器集群中发出了太多请求,它将限制您。你不会想要那样的。

直到调用之后——你得到的文件状态条目是那些从对象存储的列表命令中返回的,通常每个HTTPS请求返回数千个文件条目,因此效率更高。

要了解更多细节,请查看org.apache.hadoop.fs.s3a.S3AFileSystem的来源。

相关内容

  • 没有找到相关文章

最新更新