如何将mapreduce作业的输出直接写入分布式缓存,以便将其传递给另一个作业



我目前正在练习Map-reduce(Hadoop 2.2(,需要您在其中一个概念上的帮助。

我有一个用例,我想使用两个作业来完成。我希望将 job1 的输出写入分布式缓存,并将其作为输入传递给第二个作业。

基本上,我想避免将第一个作业的输出写入文件,从而导致开销。

用例输入:

歌曲文件 -

|编号 | 歌曲 | 类型 |

|S1 | 歌曲1 | 古典 |
|s2 |歌曲2 | 爵士乐 |
|s2 |歌曲3 | 古典 |
.

用户评分文件 -

|User_Id | Song_Id | 评分 |

|

U1 | S1       | 7    |  
|U2 | S2   | 5 |  
       |U3 | S2   | 9 |  
       |U4 | S1   | 7 |  
       |U5 | S5   | 5 |  
       |U6 | S1   | 9 |  
       

注意:这两个文件都包含非常大的数据。

用例说明:

查找古典类型每首歌曲的平均评分。

我提出的实际/预期解决方案是我将使用两个链式作业。
1.Job1:它将获取古典歌曲的所有ID,并添加到分布式缓存

2.Job2:第二个作业中的映射器根据缓存中的值过滤经典歌曲的评级。 Reducer将计算每首歌曲的平均评级。

我在网上搜索了一下,看看我们是否可以将作业的输出直接写入分布式缓存,但找不到有用的信息。

我在堆栈溢出上发现了类似的问题:

"How to directly send the output of a mapper-reducer to a another mapper-reducer without
 saving the output into the hdfs"

解决方案是使用"SequenceFileOutputFormat"。

但是,就我而言,我希望所有歌曲 ID 都可用于第二个作业中的每个映射器。所以我认为上述解决方案在我的情况下不起作用。

我想采用的另一种方法是运行第一个作业,该作业查找古典歌曲的 ID 并将输出(歌曲 ID(写入文件并创建一个新作业并将歌曲 id 输出文件添加到第二个作业的缓存中。请指教。

非常感谢

您的帮助。

如果 eah 记录的大小较小<1mb,您可以将中间结果更新为 MemCached

遵循第二种方法。

  1. 第一个作业会将输出写入文件系统。

  2. 第二个作业将使用作业 API 而不是已弃用DistributedCache API 将所需文件传递给所有节点。

查看适用于以下方法的新作业 API

addCacheFile(URI uri)
getCacheFiles()

等。

一种方法

可以在分布式缓存中加载第一个作业的输出,然后启动第二个作业。

//CONFIGURATION
Job job = Job.getInstance(getConf(), "Reading from distributed cache and etc.");
job.setJarByClass(this.getClass());
////////////
FileSystem fs = FileSystem.get(getConf());
/*
 * if you have, for example, a map only job, 
 * that "something" could be "part-"
 */
FileStatus[] fileList = fs.listStatus(PATH OF FIRST JOB OUTPUT, 
                           new PathFilter(){
                                 @Override public boolean accept(Path path){
                                        return path.getName().contains("SOMETHING");
                                 } 
                            } );
for(int i=0; i < fileList.length; i++){ 
    DistributedCache.addCacheFile(fileList[i].getPath().toUri(), job.getConfiguration());
}
//other parameters

映射:

//in mapper
@Override
public void setup(Context context) throws IOException, InterruptedException {
    //SOME STRUCT TO STORE VALUES READ (arrayList, HashMap..... whatever)
    Object store = null;
    try{
        Path[] fileCached = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        if(fileCached != null && fileCached.length > 0) {
             for(Path file : fileCached) {
                readFile(file);
                }
        }
    } catch(IOException ex) {
        System.err.println("Exception in mapper setup: " + ex.getMessage());
    }
}
private void readFile(Path filePath) {
    try{
        BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath.toString()));
        String line = null;
        while((line = bufferedReader.readLine()) != null) {
            //reading line by line that file and updating our struct store
            //....
        } //end while (cycling over lines in file)
        bufferedReader.close();
    } catch(IOException ex) {
        System.err.println("Exception while reading file: " + ex.getMessage());
    }
} //end readFile method

现在在映射阶段,您将文件作为输入传递给作业,并且您需要的值存储在结构store中。

我的答案来自如何在分布式缓存中使用MapReduce输出。

相关内容

  • 没有找到相关文章

最新更新