我正在使用Hadoop来分析非常不均匀的数据分布。 有些键有数千个值,但大多数只有一个值。 例如,与 IP 地址关联的网络流量将具有许多与几个可交谈的 IP 关联的数据包,而只有少数数据包与大多数 IP 相关联。 另一种说法是基尼指数非常高。
为了有效地处理这个问题,每个化简器应该得到一些高容量键或大量低容量键,以获得大致均匀的负载。 我知道如果我在编写分区过程,我会怎么做:我会获取映射器生成的keys
(包括所有重复键(的排序列表以及N
的化简器数量,并将拆分放在
split[i] = keys[floor(i*len(keys)/N)]
减速器i
将获得k
密钥,以便split[i] <= k < split[i+1]
用于0 <= i < N-1
,split[i] <= k
用于i == N-1
。
我愿意用Java编写自己的分区程序,但是Partitioner<KEY,VALUE>类似乎一次只能访问一个键值记录,而不是整个列表。 我知道Hadoop对映射器生成的记录进行排序,所以这个列表一定存在于某个地方。 它可能分布在多个分区程序节点之间,在这种情况下,我会在其中一个子列表上执行拆分过程,并以某种方式将结果传达给所有其他分区程序节点。 (假设所选分区程序节点看到随机子集,则结果仍将是近似的负载平衡。 有谁知道排序的密钥列表存储在哪里,以及如何访问它?
我不想写两个map-reduce作业,一个用于查找拆分,另一个用于实际使用它们,因为这似乎很浪费。 (映射器必须执行两次相同的工作。 这似乎是一个普遍的问题:分布不均匀很常见。
我也一直在考虑这个问题。 如果有人强迫我,我会采取这种高级方法。
- 除了用于解决业务问题的映射器逻辑之外,还可以编写一些逻辑来收集分区程序中所需的任何统计信息,以便以平衡的方式分发键值对。 当然,每个映射器只会看到一些数据。
- 每个映射器都可以找出其任务 ID,并使用该 ID 在指定的 hdfs 文件夹中构建唯一的文件名,以保存收集的统计信息。 将此文件写在任务结束时运行的 cleanup(( 方法中。
- 在分区程序中使用延迟初始化来读取指定 HDFS 目录中的所有文件。 这样可以获取在映射器阶段收集的所有统计信息。 从那里,您可以实现正确分区数据所需的任何分区逻辑。
这一切都假设在所有映射器完成之前不会调用分区程序,但这是我迄今为止能够做的最好的事情。
据我所知 - MR 处理中没有一个地方存在所有密钥。更重要的是 - 不能保证单台机器可以存储这些数据。我认为这个问题在当前的MR框架中没有理想的解决方案。我认为是这样,因为要获得理想的解决方案 - 我们必须等待最后一个映射器的结束,然后才分析密钥分布并利用这些知识参数化分区器。
这种方法将使系统大大复杂化并增加延迟。
我认为好的近似可能是对数据进行随机抽样以获得键分布的想法,然后使 partiotioner 根据它工作。
据我了解,Terasort实现正在做一些非常相似的事情:http://sortbenchmark.org/YahooHadoop.pdf