我目前正在练习Map-reduce(Hadoop 2.2(,需要您在其中一个概念上的帮助。
我有一个用例,我想使用两个作业来完成。我希望将 job1 的输出写入分布式缓存,并将其作为输入传递给第二个作业。
基本上,我想避免将第一个作业的输出写入文件,从而导致开销。
用例输入:
歌曲文件 -
|编号 | 歌曲 | 类型 |
|S1 | 歌曲1 | 古典 |
|s2 |歌曲2 | 爵士乐 |
|s2 |歌曲3 | 古典 |
.
用户评分文件 -
|User_Id | Song_Id | 评分 |
|U1 | S1 | 7 |
|U2 | S2 | 5 |
|U3 | S2 | 9 |
|U4 | S1 | 7 |
|U5 | S5 | 5 |
|U6 | S1 | 9 |
注意:这两个文件都包含非常大的数据。
用例说明:
查找古典类型每首歌曲的平均评分。
我提出的实际/预期解决方案是我将使用两个链式作业。
1.Job1:它将获取古典歌曲的所有ID,并添加到分布式缓存
中
2.Job2:第二个作业中的映射器根据缓存中的值过滤经典歌曲的评级。 Reducer将计算每首歌曲的平均评级。
我在网上搜索了一下,看看我们是否可以将作业的输出直接写入分布式缓存,但找不到有用的信息。
我在堆栈溢出上发现了类似的问题:
"How to directly send the output of a mapper-reducer to a another mapper-reducer without
saving the output into the hdfs"
解决方案是使用"SequenceFileOutputFormat"。
但是,就我而言,我希望所有歌曲 ID 都可用于第二个作业中的每个映射器。所以我认为上述解决方案在我的情况下不起作用。
我想采用的另一种方法是运行第一个作业,该作业查找古典歌曲的 ID 并将输出(歌曲 ID(写入文件并创建一个新作业并将歌曲 id 输出文件添加到第二个作业的缓存中。请指教。
非常感谢您的帮助。
如果 eah 记录的大小较小<1mb,您可以将中间结果更新为 MemCached
遵循第二种方法。
第一个作业会将输出写入文件系统。
第二个作业将使用作业 API 而不是已弃用
DistributedCache
API 将所需文件传递给所有节点。
查看适用于以下方法的新作业 API
addCacheFile(URI uri)
getCacheFiles()
等。
可以在分布式缓存中加载第一个作业的输出,然后启动第二个作业。
//CONFIGURATION
Job job = Job.getInstance(getConf(), "Reading from distributed cache and etc.");
job.setJarByClass(this.getClass());
////////////
FileSystem fs = FileSystem.get(getConf());
/*
* if you have, for example, a map only job,
* that "something" could be "part-"
*/
FileStatus[] fileList = fs.listStatus(PATH OF FIRST JOB OUTPUT,
new PathFilter(){
@Override public boolean accept(Path path){
return path.getName().contains("SOMETHING");
}
} );
for(int i=0; i < fileList.length; i++){
DistributedCache.addCacheFile(fileList[i].getPath().toUri(), job.getConfiguration());
}
//other parameters
映射:
//in mapper
@Override
public void setup(Context context) throws IOException, InterruptedException {
//SOME STRUCT TO STORE VALUES READ (arrayList, HashMap..... whatever)
Object store = null;
try{
Path[] fileCached = DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(fileCached != null && fileCached.length > 0) {
for(Path file : fileCached) {
readFile(file);
}
}
} catch(IOException ex) {
System.err.println("Exception in mapper setup: " + ex.getMessage());
}
}
private void readFile(Path filePath) {
try{
BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath.toString()));
String line = null;
while((line = bufferedReader.readLine()) != null) {
//reading line by line that file and updating our struct store
//....
} //end while (cycling over lines in file)
bufferedReader.close();
} catch(IOException ex) {
System.err.println("Exception while reading file: " + ex.getMessage());
}
} //end readFile method
现在在映射阶段,您将文件作为输入传递给作业,并且您需要的值存储在结构store
中。
我的答案来自如何在分布式缓存中使用MapReduce输出。