我需要处理一个文本文件文件夹。文本文件可以是任何扩展名。
对于每个扩展,我们需要单独的自定义读取器来处理hadoop中的文件。
folder1/
Data1.pdf
Data2.xml
Data3.html
Data4.txt
Data5.csv
获取文件夹中文件扩展名并为我的MR作业设置自定义输入格式的更好方法是什么?
到目前为止,我所做的是
驱动
FileStatus[] stati = null;
try {
stati = fs.listStatus(in);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
for (FileStatus status : stati) {
Path path = status.getPath();
System.out.println("Path----> "+path);
/*
* get file extension
*/
String ext = FilenameUtils.getExtension(path.toString());
System.out.println("ext--->"+ext);
if(ext.equals("pdf")){
//custom pdf record reader
job.setInputFormatClass(PdfInputFormat.class);
}
else{
job.setInputFormatClass(TextInputFormat.class);
}
}
但这在folder1中不起作用,但如果folder1只包含.pdf文件,则可以正常工作。
我错过了什么吗?
希望这不会很好,因为我正在遍历文件夹(比如folder2-->Data5.pdf,Data4.csv)。这个setInputFormatClass不会对作为输入格式的TextInputFormat.addInputPath(job,in)将设置为Data4.csv.的最后一个迭代值
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
try {
TextInputFormat.addInputPath(job, in);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
TextOutputFormat.setOutputPath(job, out);
编辑
Job job = null;
try {
job = new Job(conf, "TextMining");
} catch (IOException e) {
e.printStackTrace();
}
/*
* check entension
*/
for (FileStatus status : stati) {
Path path = status.getPath();
System.out.println("Path----> "+path);
/*
* get file extension
*/
String ext = FilenameUtils.getExtension(path.toString());
System.out.println("ext--->"+ext);
if(ext.equals("pdf")){
System.out.println("Pdf File Format");
// MultipleInputs.addInputPath(job, path,PdfInputFormat.class, PDFStemmingMapper.class);
job.setInputFormatClass(PdfInputFormat.class);
}
else if(ext.equals("xlsx")){
System.out.println("Excel File Format");
job.setInputFormatClass(ExcelInputFormat.class);
}
else{
System.out.println("normal Text File");
job.setInputFormatClass(TextInputFormat.class);
}
}
job.setJarByClass(Driver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
//job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
// try {
// TextInputFormat.addInputPath(job, in);
// } catch (IOException e) {
// e.printStackTrace();
// }
TextOutputFormat.setOutputPath(job, out);
我正在努力实现上述目标。但这并没有带来任何产出。请提出建议。
从上下文,获取Input Split,然后获取Path&路径中的名称。
Context => getInputSplit() => getPath => getName()
获得名称后,从该索引中查找lastIndexOf(".")
和子字符串。
现在在子字符串中有了扩展名,并使用它进行比较。
编辑:
以下方法对你们可行吗?
-
为每种类型的扩展提供单独的Mapper。
-
在Driver类中添加以下行。
MultipleInputs.addInputPath(job, path_pdf,inputFormatClass, PDFMapper.class) MultipleInputs.addInputPath(job, path_xml, inputFormatClass,XMLMapper.class) MultipleInputs.addInputPath(job, path_html,inputFormatClass,HTMLMapper.class) MultipleInputs.addInputPath(job, path_csv,inputFormatClass,CVSMapper.class)
对于映射器,期望的输入是一条记录(由值指示)。如何构造此记录并将其传递给map方法由InputFormat处理。
例如:默认的输入格式TextInputFormat会将文件中的一行视为记录。这通常适用于txt/xsv文件。
对于其他文件类型,更好的方法是使用自定义输入格式,它知道如何表示一条记录。(在XML中,一条记录可以是一个子块)
如果您已经为已识别的所有文件类型创建了InputFormat类,则可以使用MultipleInputs。
看看这里的Javadochttps://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.html
可以使用FileSystem API