>假设我有一个包含此类信息的大tsv文件:
2012-09-22 00:00:01.0 249342258346881024 47268866 0 0 0 bo
2012-09-22 00:00:02.0 249342260934746115 1344951 0 0 4 ot
2012-09-22 00:00:02.0 249342261098336257 346095334 1 0 0 ot
2012-09-22 00:05:02.0 249342261500977152 254785340 0 1 0 ot
我想实现一个 MapReduce 作业,它枚举五分钟的时间间隔并过滤 tsv 输入的一些信息。输出文件如下所示:
0 47268866 bo
0 134495 ot
0 346095334 ot
1 254785340 ot
键是区间的编号,例如,0是2012-09-22 00:00:00.0
到2012-09-22 00:04:59
之间的区间的参考。
我不知道这个问题是否不适合MapReduce方法,或者我是否认为不正确。在map函数中,我只是将时间戳作为键传递,将过滤后的信息作为值传递。在reduce函数中,我使用全局变量计算间隔并生成提到的输出。
i. 框架是以某种自动方式确定化简器的数量还是用户定义的?使用一个化简器,我认为我的方法没有问题,但我想知道在处理非常大的文件时,一个化简是否会成为瓶颈,可以吗?
ii. 如何解决多个减速器的问题?
任何建议将不胜感激!提前感谢!
编辑:
第一个问题由@Olaf回答,但第二个问题仍然让我对并行性产生一些疑问。我的地图函数的地图输出目前是这样的(我只是以分钟精度传递时间戳):
2012-09-22 00:00 47268866 bo
2012-09-22 00:00 344951 ot
2012-09-22 00:00 346095334 ot
2012-09-22 00:05 254785340 ot
因此,在reduce函数中,我接收的输入是键表示收集信息的分钟数和信息本身的值,我想枚举从0开始的五分钟间隔。我目前正在使用全局变量来存储间隔的开始,当键推断它时,我正在增加间隔计数器(这也是一个全局变量)。
这是代码:
private long stepRange = TimeUnit.MINUTES.toMillis(5);
private long stepInitialMillis = 0;
private int stepCounter = 0;
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
long millis = Long.valueOf(key.toString());
if (stepInitialMillis == 0) {
stepInitialMillis = millis;
} else {
if (millis - stepInitialMillis > stepRange) {
stepCounter = stepCounter + 1;
stepInitialMillis = millis;
}
}
for (Text value : values) {
context.write(new Text(String.valueOf(stepCounter)),
new Text(key.toString() + "t" + value));
}
}
因此,使用多个化简器,我将在两个或多个 JVM 中的两个或多个节点上运行我的 reduce 函数,我将失去全局变量给出的控制,并且我没有考虑针对我的情况的解决方法。
化简器的数量取决于集群的配置,尽管您可以限制MapReduce作业使用的化简器数量。
如果你正在处理任何大量的数据,单个化简器确实会成为你的MapReduce作业的瓶颈。
Hadoop MapReduce引擎保证与同一键关联的所有值都发送到同一个化简器,所以你的方法应该适用于多重化简器。有关详细信息,请参阅Yahoo!教程:http://developer.yahoo.com/hadoop/tutorial/module4.html#listreducing
编辑:为了保证同一时间间隔的所有值都转到同一个化简器,您必须使用时间间隔的一些唯一标识符作为键。您必须在映射器中执行此操作。我再次阅读您的问题,除非您想以某种方式聚合对应于相同时间间隔的记录之间的数据,否则您根本不需要任何化简器。
编辑:正如@SeanOwen指出的那样,化简器的数量取决于集群的配置。通常,它配置在每个节点的最大任务数乘以数据节点数的 0.95 到 1.75 倍之间。如果未在群集配置中设置 mapred.reduce.tasks 值,则默认的化简器数为 1。
看起来您想按五分钟块聚合一些数据。使用Hadoop的Map-reduce非常适合这种事情!不应该有理由使用任何"全局变量"。以下是我将如何设置它:
映射器读取 TSV 的一行。它获取时间戳,并计算它属于哪个五分钟的存储桶。把它做成一个字符串,然后作为键发出,比如"20120922:0000"、"20120922:0005"、"20120922:0010"等。至于与该键一起发出的值,只需保持简单开始,并在整个制表符分隔的行上作为另一个 Text 对象发送。
现在映射器已经确定了需要如何组织数据,那么化简器的工作就是进行聚合。每个减速器将获得一个密钥(五分钟降压器之一),以及适合该存储桶的所有行的列表。它可以遍历该列表,并从中提取所需的任何内容,根据需要将输出写入上下文。
至于映射器,让Hadoop弄清楚那部分。将化简器数设置为群集中具有的节点数,作为起点。应该运行良好。
希望这有帮助。