我是Hadoop的新手,我设法运行wordCount示例:http://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html
假设我们有一个包含 3 个文件的文件夹。我想每个文件都有一个映射器,这个映射器只会计算行数并将其返回给化简器。
然后,化简器将每个映射器的行数作为输入,并将所有 3 个文件中存在的行总数作为输出。
因此,如果我们有以下 3 个文件
input1.txt
input2.txt
input3.txt
映射器返回:
mapper1 -> [input1.txt, 3]
mapper2 -> [input2.txt, 4]
mapper3 -> [input3.txt, 9]
减速器将给出
3+4+9 = 16
我已经在一个简单的Java应用程序中完成了此操作,因此我想在Hadoop中执行此操作。我只有 1 台计算机,想尝试在伪分布式环境中运行。
我怎样才能实现这个目标?我应该采取哪些正确的步骤?
我的代码应该看起来像 apache 示例中的样子吗?我将有两个静态类,一个用于映射器,一个用于化简器?或者我应该有 3 个类,每个映射器一个?
如果你能指导我完成这个,我不知道该怎么做,我相信如果我设法编写一些代码来做这些事情,那么我将能够编写更复杂的应用程序在未来。
谢谢!
除了 sa125 的答案之外,您还可以通过不为每个输入记录发出一条记录来极大地提高性能,而只是在映射器中累积一个计数器,然后在映射器清理方法中,发出文件名和计数值:
public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
protected long lines = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
String filename = split.getPath().toString();
context.write(new Text(filename), new LongWritable(lines));
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
lines++;
}
}
我注意到您使用了 0.18 版本的文档。这是指向 1.0.2(最新(的链接。
第一个建议 - 使用IDE(日食,IDEA等(。它真的有助于填补空白。
在实际的HDFS中,你无法知道文件的每一部分驻留在哪里(不同的机器和集群(。无法保证第 X 行甚至与第 Y 行驻留在同一个磁盘上。也不能保证第 X 行不会拆分到不同的机器上(HDFS 将数据以块的形式分发,通常每个块 64Mb(。这意味着您不能假设同一个映射器将处理整个文件。您可以确保每个文件都由同一个化简器处理。
由于从映射器发送的每个键的化简器都是唯一的,因此我这样做的方法是使用文件名作为映射器中的输出键。此外,映射器的默认输入类是 TextInputFormat
,这意味着每个映射器将自行接收整行(由 LF 或 CR 终止(。然后,您可以从映射器发出文件名和数字 1(或任何与计算无关的内容(。然后,在化简器中,您只需使用循环来计算文件名的接收次数:
在映射器的地图函数中
public static class Map extends Mapper<IntWritable, Text, Text, Text> {
public void map(IntWritable key, Text value, Context context) {
// get the filename
InputSplit split = context.getInputSplit();
String fileName = split.getPath().getName();
// send the filename to the reducer, the value
// has no meaning (I just put "1" to have something)
context.write( new Text(fileName), new Text("1") );
}
}
在减速器的减速功能
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text fileName, Iterator<Text> values, Context context) {
long rowcount = 0;
// values get one entry for each row, so the actual value doesn't matter
// (you can also get the size, I'm just lazy here)
for (Text val : values) {
rowCount += 1;
}
// fileName is the Text key received (no need to create a new object)
context.write( fileName, new Text( String.valueOf( rowCount ) ) );
}
}
在驱动程序/主驱动程序中
你几乎可以使用与字数统计示例相同的驱动程序 - 请注意,我使用了新的mapreduce API,所以你需要调整一些东西(Job
而不是JobConf
等(。当我阅读它时,这真的很有帮助。
请注意,您的 MR 输出将只是每个文件名及其行计数:
input1.txt 3
input2.txt 4
input3.txt 9
如果您只想计算所有文件中的行总数,只需在所有映射器中发出相同的键(而不是文件名(。这样,将只有一个化简器来处理所有行计数:
// no need for filename
context.write( new Text("blah"), new Text("1") );
您还可以链接一个将处理每个文件行计数输出的作业,或执行其他花哨的操作 - 这取决于您。
我省略了一些样板代码,但基础知识就在那里。请务必检查我,因为我是从内存中键入的大部分内容。:)
希望这有帮助!