有效的减少MapReduce结果的方法



我编写了一个MapReduce作业,它在数据集上获取ngram计数。结果保存在100个300MB的文件中,格式为<ngram>t<count>。我想把这些合并成一个结果,但我的几次合并尝试都失败了("任务跟踪器已经消失了")。我的超时时间是8小时,而这次崩溃发生在8.5小时左右,所以可能是相关的。我有# reducers=5(与# of nodes相同)。也许我只需要留出更多的时间,尽管错误似乎没有表明这一点。我怀疑我的节点正在超载,并且变得没有响应。我的理论是减速器可以进行一些优化。

我使用cat作为我的映射器,并使用以下python脚本作为我的reducer:

#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if key not in counts:
        counts[key] = 0
    counts[key] += count
for key in sorted(counts.keys()):
    print '%st%s'% (key, counts[key])

更新:正如我在我的一个评论中暗示的那样,我对Hadoop自动进行的排序感到困惑。在web UI中,reducer状态显示了几个不同的阶段,包括"sort"one_answers"reduce"。由此,我假设Hadoop在将mapper输出发送到reduce之前对其进行排序,但不清楚的是,排序是针对发送到reducer的所有数据,还是针对每个文件进行排序。换句话说,我的映射器获取100个字段,将其分成400个输出,每个都简单地将它们cat -ing到reducer,然后reducer(总共5个)每个接收这80个流。排序是将所有80个组合起来,还是对1进行排序,减少它;等等?根据图表,排序过程发生在任何减少之前,而这些图表显然不能指示实际的行为。如果排序确实对所有输入文件进行了排序,那么我可以简化我的reducer,使其不存储所有计数的字典,而只在键更改时打印出key- totalcount对。

关于使用组合器,我不认为这对我来说是有益的,因为我正在减少的数据已经在我试图组合的100个文件中减少了。因为我的# nodes = # reducers (5 &

问题是我误解了MapReduce的工作方式。所有进入Reducer的数据都是排序的。我上面的代码完全没有经过优化。相反,我只是跟踪当前键,然后在出现新键时打印出前一个当前键。

#!/usr/bin/env python
import sys
cur_key = None
cur_key_count = 0
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    # if new key, reset count, note current key, and output lastk key's result
    if key != cur_key:
        if cur_key is not None:
            print '%st%s'% (cur_key, cur_key_count)
        cur_key = key
        cur_key_count = 0
    cur_key_count += count
# printing out final key if set
if cur_key:
    print '%st%s'% (cur_key, cur_key_count)

使用top检查您的reducer在运行时是CPU绑定而不是IO绑定(可能导致交换)。

每台主机8小时/20个作业等于每300Mb作业24分钟

你可以使用heapq,这样在内存中构建的数据结构是保持排序的:参见8.4.1节:http://docs.python.org/library/heapq.html

相关内容

  • 没有找到相关文章

最新更新