有人可以举例说明mapreduce中中位数/分位数的计算吗?
我对 Datafu 中位数的理解是,"n"映射器对数据并将数据发送到负责排序的"1"减速器来自 n 个映射器的所有数据并找到中位数(中间值)我的理解正确吗?
如果是这样,此方法是否适用于海量数据,因为我可以清楚地看到一个减速器努力完成最后的任务。谢谢
试图找到序列中的中位数(中间数)将需要将 1 个化简器传递给整个数字范围以确定哪个是"中间"值。
根据输入集中值的范围和唯一性,您可以引入一个组合器来输出每个值的频率 - 减少发送到单个化简器的映射输出数量。然后,您的化简器可以使用排序值/频率对来识别中位数。
另一种可以扩展此值的方法(如果您知道值的范围和粗略分布)是使用自定义分区程序,该分区程序按范围存储桶分发键(0-99 转到化简器 0,100-199 转到化简器 2,依此类推)。然而,这将需要一些次要工作来检查化简器输出并执行最终的中位数计算(例如,知道每个化简器中的键数,您可以计算哪个化简器输出将包含中位数,以及哪个偏移
你真的需要精确的中位数和分位数吗?
很多时候,你最好只获取近似值,并使用它们,特别是如果你使用它来例如数据分区。
实际上,您可以使用近似分位数来加快查找确切分位数的速度(实际上是O(n/p)
时间内),以下是该策略的粗略概述:
- 让每个分区的映射器计算所需的分位数,并将其输出到新的数据集。这个数据集应该小几个数量级(除非你要求太多的分位数!
- 在此数据集中,再次计算分位数,类似于"中位数"。这些是你的初步估计。
- 根据这些分位数(甚至是以这种方式获得的其他分区)对数据进行重新分区。目标是最终,真正的分位数保证在一个分区中,并且每个分区中最多应该有一个所需的分位数
- 在每个分区中,执行快速选择(以
O(n)
为单位)以查找真正的分位数。
每个步骤都是线性时间。最昂贵的步骤是第 3 部分,因为它需要重新分发整个数据集,因此它会生成O(n)
网络流量。您可以通过为第一次迭代选择"备用"分位数来优化该过程。比如说,你想找到全局中位数。您无法在线性过程中轻松找到它,但是当它被拆分为 k 个分区时,您可能会将其缩小到数据集的 1/kth。因此,与其让每个节点报告其中位数,不如让每个节点额外报告(k-1)/(2k)和(k+1)/(2k)处的对象。这应该允许您缩小真实中位数必须显著存在的值范围。因此,在下一步中,您可以将每个节点将所需范围内的对象发送到单个主节点,并仅选择此范围内的中位数。
O((n log n)/p) 对其进行排序,然后 O(1) 获取中位数。
是的。。。你可以得到O(n/p),但你不能使用Hadoop中的开箱即用排序功能。我只会排序并获取中心项目,除非您能证明 2-20 小时的开发时间来编写并行第 k 个最大算法是合理的。
在许多实际方案中,数据集中值的基数相对较小。在这种情况下,可以通过两个MapReduce作业有效地解决问题:
- 计算数据集中值的频率(基本上是字数统计作业)
- 恒等映射器+根据<值-频率>对计算中位数的化简器值-频率>
作业 1. 将大大减少数据量,并且可以完全并行执行。作业 2 的减速器。只需要处理n
(n
= cardinality of your value set
)项而不是所有值,就像朴素的方法一样。
下面,作业 2 的示例化简器。它是可以直接在Hadoop流中使用的python脚本。假设数据集中的值ints
,但可以很容易地用于double
import sys
item_to_index_range = []
total_count = 0
# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
item, count = line.strip().split("t", 1)
new_total_count = total_count + int(count)
item_to_index_range.append((item, (total_count + 1, new_total_count + 1)))
total_count = new_total_count
# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
middle_items_indexes += [total_count / 2]
# Retrieve middle item(s)
middle_items = []
for i in middle_items_indexes:
for item, index_range in item_to_index_range:
if i in range(*index_range):
middle_items.append(item)
continue
print sum(middle_items) / float(len(middle_items))
这个答案建立在最初来自克里斯怀特的答案的建议之上。答案建议使用组合器作为平均值来计算值的频率。但是,在MapReduce中,组合器不能保证始终执行。这有一些副作用:
- 化简器首先必须计算最终<值 _x002D_=" 频率=">对,然后计算中位数。 值>
- 在最坏的情况下,合并器将永远不会被执行,并且化简器仍然必须努力处理所有单个值。