Java Hadoop:如何创建映射器作为输入文件,并给出每个文件中行数的输出



我是Hadoop的新手,我设法运行wordCount示例:http://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html

假设我们有一个包含 3 个文件的文件夹。我想每个文件都有一个映射器,这个映射器只会计算行数并将其返回给化简器。

然后,化简器将每个映射器的行数作为输入,并将所有 3 个文件中存在的行总数作为输出。

因此,如果我们有以下 3 个文件

input1.txt
input2.txt
input3.txt

映射器返回:

mapper1 -> [input1.txt, 3]
mapper2 -> [input2.txt, 4]
mapper3 -> [input3.txt, 9]

减速器将给出

3+4+9 = 16 

我已经在一个简单的Java应用程序中完成了此操作,因此我想在Hadoop中执行此操作。我只有 1 台计算机,想尝试在伪分布式环境中运行。

我怎样才能实现这个目标?我应该采取哪些正确的步骤?

我的代码应该看起来像 apache 示例中的样子吗?我将有两个静态类,一个用于映射器,一个用于化简器?或者我应该有 3 个类,每个映射器一个?

如果你能指导我完成这个,我不知道该怎么做,我相信如果我设法编写一些代码来做这些事情,那么我将能够编写更复杂的应用程序在未来。

谢谢!

除了 sa125 的答案之外,您还可以通过不为每个输入记录发出一条记录来极大地提高性能,而只是在映射器中累积一个计数器,然后在映射器清理方法中,发出文件名和计数值:

public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    protected long lines = 0;
    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        String filename = split.getPath().toString();
        context.write(new Text(filename), new LongWritable(lines));
    }
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        lines++;
    }
}

我注意到您使用了 0.18 版本的文档。这是指向 1.0.2(最新(的链接。

第一个建议 - 使用IDE(日食,IDEA等(。它真的有助于填补空白。

在实际的HDFS中,你无法知道文件的每一部分驻留在哪里(不同的机器和集群(。无法保证第 X 行甚至与第 Y 行驻留在同一个磁盘上。也不能保证第 X 行不会拆分到不同的机器上(HDFS 将数据以块的形式分发,通常每个块 64Mb(。这意味着您不能假设同一个映射器将处理整个文件。您可以确保每个文件都由同一个化简器处理

由于从映射器发送的每个键的化简器都是唯一的,因此我这样做的方法是使用文件名作为映射器中的输出键。此外,映射器的默认输入类是 TextInputFormat ,这意味着每个映射器将自行接收整行(由 LF 或 CR 终止(。然后,您可以从映射器发出文件名和数字 1(或任何与计算无关的内容(。然后,在化简器中,您只需使用循环来计算文件名的接收次数:

在映射器的地图函数中

public static class Map extends Mapper<IntWritable, Text, Text, Text> {
  public void map(IntWritable key, Text value, Context context) {
    // get the filename
    InputSplit split = context.getInputSplit();
    String fileName = split.getPath().getName();
    // send the filename to the reducer, the value
    // has no meaning (I just put "1" to have something)
    context.write( new Text(fileName), new Text("1") );
  }
}

在减速器的减速功能

public static class Reduce extends Reducer<Text, Text, Text, Text> {
  public void reduce(Text fileName, Iterator<Text> values, Context context) {
    long rowcount = 0;
    // values get one entry for each row, so the actual value doesn't matter
    // (you can also get the size, I'm just lazy here)
    for (Text val : values) {
      rowCount += 1;
    }
    // fileName is the Text key received (no need to create a new object)
    context.write( fileName, new Text( String.valueOf( rowCount ) ) );
  }
}

在驱动程序/主驱动程序中

你几乎可以使用与字数统计示例相同的驱动程序 - 请注意,我使用了新的mapreduce API,所以你需要调整一些东西(Job而不是JobConf等(。当我阅读它时,这真的很有帮助。

请注意,您的 MR 输出将只是每个文件名及其行计数:

input1.txt    3
input2.txt    4
input3.txt    9

如果您只想计算所有文件中的行总数,只需在所有映射器中发出相同的键(而不是文件名(。这样,将只有一个化简器来处理所有行计数:

// no need for filename
context.write( new Text("blah"), new Text("1") );

您还可以链接一个将处理每个文件行计数输出的作业,或执行其他花哨的操作 - 这取决于您。

我省略了一些样板代码,但基础知识就在那里。请务必检查我,因为我是从内存中键入的大部分内容。:)

希望这有帮助!

相关内容

  • 没有找到相关文章

最新更新