我有很多输入文件,我想根据最后附加的日期处理选定的文件。我现在很困惑,我在哪里使用globStatus方法来过滤掉文件。
我有一个自定义RecordReader类,我试图在其下一个方法中使用globStatus,但它没有成功。
public boolean next(Text key, Text value) throws IOException {
Path filePath = fileSplit.getPath();
if (!processed) {
key.set(filePath.getName());
byte[] contents = new byte[(int) fileSplit.getLength()];
value.clear();
FileSystem fs = filePath.getFileSystem(conf);
fs.globStatus(new Path("/*" + date));
FSDataInputStream in = null;
try {
in = fs.open(filePath);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
我知道它返回一个FileStatus数组,但是我如何使用它来过滤文件呢?有人能告诉我吗?
globStatus
方法有2个附加参数,允许您过滤文件。第一个是glob模式,但有时glob模式不够强大,无法过滤特定的文件,在这种情况下,可以定义一个PathFilter
。
对于glob模式,支持以下操作:
Glob | Matches
-------------------------------------------------------------------------------------------------------------------
* | Matches zero or more characters
? | Matches a single character
[ab] | Matches a single character in the set {a, b}
[^ab] | Matches a single character not in the set {a, b}
[a-b] | Matches a single character in the range [a, b] where a is lexicographically less than or equal to b
[^a-b] | Matches a single character not in the range [a, b] where a is lexicographically less than or equal to b
{a,b} | Matches either expression a or b
c | Matches character c when it is a metacharacter
PathFilter
就是这样一个简单的接口:
public interface PathFilter {
boolean accept(Path path);
}
所以你可以实现这个接口,并实现accept
方法,你可以把你的逻辑过滤文件。
摘自Tom White的一本好书中的一个例子,它允许你定义一个PathFilter
来过滤匹配某个正则表达式的文件:
public class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}
您可以在初始化作业时通过调用FileInputFormat.setInputPathFilter(JobConf, RegexExcludePathFilter.class)
来直接使用PathFilter
实现过滤输入。
EDIT:由于必须在setInputPathFilter
中传递类,因此不能直接传递参数,但应该能够通过使用Configuration
来做类似的事情。如果你让你的RegexExcludePathFilter
也从Configured
扩展,你可以得到一个Configuration
对象,你之前已经初始化了所需的值,所以你可以在你的过滤器中得到这些值,并在accept
中处理它们。
例如:
conf.set("date", "2013-01-15");
然后你可以这样定义你的过滤器:
public class RegexIncludePathFilter extends Configured implements PathFilter {
private String date;
private FileSystem fs;
public boolean accept(Path path) {
try {
if (fs.isDirectory(path)) {
return true;
}
} catch (IOException e) {}
return path.toString().endsWith(date);
}
public void setConf(Configuration conf) {
if (null != conf) {
this.date = conf.get("date");
try {
this.fs = FileSystem.get(conf);
} catch (IOException e) {}
}
}
}
EDIT 2:原始代码有一些问题,请参阅更新后的类。您还需要删除构造函数,因为它不再被使用,并检查它是否是一个目录,在这种情况下,您应该返回true,以便目录的内容也可以被过滤。
对于任何读到这篇文章的人,我可以说"请不要在过滤器中做任何比验证路径更复杂的事情"。具体来说:不要检查文件是否为目录,获取其大小等。等待,直到list/glob操作返回,然后使用现在填充的FileStatus
条目中的信息在那里进行过滤。
为什么?所有对getFileStatus()
的调用,无论是直接调用还是通过isDirectory()
调用,都是对文件系统进行不必要的调用,这些调用会在HDFS集群上增加不必要的namenode负载。更关键的是,针对S3和其他对象存储,每个操作都可能发出多个HTTPS请求——这些请求确实需要可测量的时间。更好的是,如果S3认为您在整个机器集群中发出了太多请求,它将限制您。你不会想要那样的。
直到调用之后——你得到的文件状态条目是那些从对象存储的列表命令中返回的,通常每个HTTPS请求返回数千个文件条目,因此效率更高。
要了解更多细节,请查看org.apache.hadoop.fs.s3a.S3AFileSystem
的来源。