使用MultipleInputs的Hadoop映射程序的控制流是什么



目标:实现Reduce Side Join

我的代码中目前有作业链接(两个作业)。现在我想用另一份工作在reduce端实现加入。我必须接受多个输入:

Input #1:上一个减速器的输出
Input #2: HDFS中用于实现联接的新文件。

看到一些关于如何使用MultipleInputs.addInputhPath(job, Path, InputFormat.class, Mapper.class); 的文章

所以我知道我必须使用它两次,一次用于Input #1,一次为Input #2

Question 1:那么,如果我使用两个单独的映射器和一个reducer,哪个映射器将首先执行(或者它们将并行执行)?如何在还原器侧检查哪个映射器已发出<key, value>对?

Question 2:如果我使用单个映射器和单个reducer,控制流是什么?

Question 3:更多的是黑客,即不使用MultipleInputs
在减速器的setup()方法中使用DistributedCache加载Input #2(性能方面)可以吗?并将上一个减速器的输出作为作业的唯一输入。

Note: Input #2文件的大小非常小。

答案1:
如果插槽可用,则两个映射器的Map任务应并行运行。单个插槽的存在可能会导致它们按顺序运行(可能存在交错),但这不是正常情况。如果Mapper序列有一些配置,我不知道。

再次,我怀疑任何api是否可用于识别哪个映射器发射了<key, value>。准确地说,只需要识别value,因为相同的key可以由不同的映射发射。这通常是通过向输出值添加前缀标记并在reducer中解析这些标记来实现的。例如:

if(value.toString.startsWith("Input#1")){  //processing code }

看看这篇博客文章,它有所有必要的提示和技巧。请注意,所有这些示例都使用旧的mapredapi。但无论如何,逻辑都是一样的。

答案2:
如果没有MultipleInputs,在Map中,您必须使用可用的Context对象来标识传入对的文件名。例如:

public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
        String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();   
        //............
}

然后只需为输出值预加一个合适的标签,其余部分与answer1相同。

答案3:
这很棘手。当要添加到缓存的文件很小时,使用DistributedCache进行连接可以提高性能。这可能是因为作业现在运行的Map任务数量较少。但它会对大文件产生不利影响。难题是要知道DistributedCache有多少字节被认为是小的。

由于您提到Input#2文件非常小,所以这应该是最适合您的解决方案。

注意:这篇文章的许多评论都是(有点)基于观点的。期待专家的意见。

相关内容

  • 没有找到相关文章

最新更新