无法访问分布式缓存文件



我正在尝试在2个不同的作业中读取2个缓存文件。

对于job3中的1个分布式缓存文件,所有的事情都工作得很好,但是对于job5,我无法访问第二个缓存文件。从job5中的job3获取相同的分布式缓存。

为什么会这样?

Configuration conf3 = getConf();
Path getPath = new Path(out1,"part-r-*");
FileStatus[] list = fs.globStatus(getPath);
for(FileStatus status : list){
    DistributedCache.addCacheFile(status.getPath().toUri(), conf3);
}
Job job3 = new Job(conf3, "Compute Entropy");
job3.setJarByClass(ID3ModelDriver.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(DoubleWritable.class);
job3.setMapperClass(ID3EntropyMapper.class);
job3.setReducerClass(ID3EntropyReducer.class);
job3.setInputFormatClass(KeyValueTextInputFormat.class);
job3.setOutputFormatClass(TextOutputFormat.class);  
Path out3 = new Path(EQ);
if(fs.exists(out3)){
    fs.delete(out3, true);
}
FileInputFormat.addInputPath(job3,out2);
FileOutputFormat.setOutputPath(job3,out3);
job3.waitForCompletion(true);
/*
 * JoB
 */
Configuration conf4 = getConf();
Job job4 = new Job(conf4, "Select Best Attribute");
job4.setJarByClass(ID3ModelDriver.class);
job4.setMapOutputKeyClass(Text.class);
job4.setMapOutputValueClass(DoubleWritable.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);
job4.setMapperClass(ID3SMMapper.class);
job4.setReducerClass(ID3SMReducer.class);
job4.setInputFormatClass(KeyValueTextInputFormat.class);
job4.setOutputFormatClass(TextOutputFormat.class);  
Path out4 = new Path(SM);
if(fs.exists(out4)){
    fs.delete(out4, true);
}
FileInputFormat.addInputPath(job4,out3);
FileOutputFormat.setOutputPath(job4,out4);
job4.waitForCompletion(true);
/*
 * JOB
 */
System.out.println("job5");
Configuration conf5= getConf();
//PROBLEM HERE
//Not getting the correct distributed cache file
Path getSMPath = new Path(out4,"part-r-*");
FileStatus[] listSM = fs.globStatus(getSMPath);
for(FileStatus statusSM : listSM){
    DistributedCache.addCacheFile(statusSM.getPath().toUri(), conf5);
}
Job job5 = new Job(conf5, "Generate Subdataset");
System.out.println("conf");
job5.setJarByClass(ID3ModelDriver.class);
job5.setMapOutputKeyClass(Text.class);
job5.setMapOutputValueClass(Text.class);
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(Text.class);
job5.setMapperClass(ID3GSMapper.class);
job5.setReducerClass(ID3GSReducer.class);
job5.setInputFormatClass(TextInputFormat.class);
job5.setOutputFormatClass(TextOutputFormat.class);  
Path out5 = new Path(args[1]);
if(fs.exists(out5)){
    fs.delete(out5, true);
}
FileInputFormat.addInputPath(job5,new Path(args[0]));
FileOutputFormat.setOutputPath(job5,out5);
boolean success = job5.waitForCompletion(true);
return(success ? 0 : 1);

我做错了什么吗?

请建议。

要获取所有缓存文件,我们可以遍历URI

public void setup(Context context) throws IOException {
        Configuration conf = context.getConfiguration();
        FileSystem fs = FileSystem.get(conf);
        URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
        Path getPath = new Path(cacheFiles[0].getPath());
cacheFiles[0].getPath() - gives first path
cacheFiles[1].getPath() - gives second path

相关内容

  • 没有找到相关文章

最新更新