目标:实现Reduce Side Join
我的代码中目前有作业链接(两个作业)。现在我想用另一份工作在reduce端实现加入。我必须接受多个输入:
Input #1:
上一个减速器的输出Input #2:
HDFS中用于实现联接的新文件。
看到一些关于如何使用MultipleInputs.addInputhPath(job, Path, InputFormat.class, Mapper.class);
的文章
所以我知道我必须使用它两次,一次用于Input #1
,一次为Input #2
。
Question 1:
那么,如果我使用两个单独的映射器和一个reducer,哪个映射器将首先执行(或者它们将并行执行)?如何在还原器侧检查哪个映射器已发出<key, value>
对?
Question 2:
如果我使用单个映射器和单个reducer,控制流是什么?
Question 3:
更多的是黑客,即不使用MultipleInputs
在减速器的setup()
方法中使用DistributedCache
加载Input #2
(性能方面)可以吗?并将上一个减速器的输出作为作业的唯一输入。
Note:
Input #2
文件的大小非常小。
答案1:
如果插槽可用,则两个映射器的Map
任务应并行运行。单个插槽的存在可能会导致它们按顺序运行(可能存在交错),但这不是正常情况。如果Mapper序列有一些配置,我不知道。
再次,我怀疑任何api
是否可用于识别哪个映射器发射了<key, value>
。准确地说,只需要识别value
,因为相同的key
可以由不同的映射发射。这通常是通过向输出值添加前缀标记并在reducer中解析这些标记来实现的。例如:
if(value.toString.startsWith("Input#1")){ //processing code }
看看这篇博客文章,它有所有必要的提示和技巧。请注意,所有这些示例都使用旧的mapred
api。但无论如何,逻辑都是一样的。
答案2:
如果没有MultipleInputs
,在Map
中,您必须使用可用的Context
对象来标识传入对的文件名。例如:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
//............
}
然后只需为输出值预加一个合适的标签,其余部分与answer1相同。
答案3:
这很棘手。当要添加到缓存的文件很小时,使用DistributedCache
进行连接可以提高性能。这可能是因为作业现在运行的Map
任务数量较少。但它会对大文件产生不利影响。难题是要知道DistributedCache
有多少字节被认为是小的。
由于您提到Input#2文件非常小,所以这应该是最适合您的解决方案。
注意:这篇文章的许多评论都是(有点)基于观点的。期待专家的意见。