>我在HDFS中复制了一个包含1000万行的文件。需要在映射器中处理行号 5000 到 500000。我应该怎么做?
我尝试在映射器中覆盖 run() 方法并在那里尝试使用计数器。但是,当文件被拆分并且多个映射器正在运行时,当然会有多个计数器正在运行。所以它没有帮助。粘贴下面的代码。
@Override
public void run(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
setup(context);
Integer counter = 0;
while (context.nextKeyValue()) {
LongWritable currentKey = context.getCurrentKey();
Text currentValue = context.getCurrentValue();
System.out.println(currentKey.toString());
map(currentKey, currentValue, context);
counter++;
}
System.out.println("Counter: " + counter + " Time: "
+ System.currentTimeMillis());
}
此外,我在映射器中获得的键不是行号,而是行的偏移量。我们可以得到指向行号的键吗?如果是这样,它在多个映射器中是唯一的吗?(当前 KEY(偏移量)在映射器中不是唯一的)。
我怎样才能做对?
-
默认的输入格式(如 TextInputFormat)将给出记录的字节偏移量而不是实际的行号 - 这主要是由于当输入文件可拆分并由两个或多个映射器处理时无法确定真正的行号。
-
您可以创建自己的 InputFormat 来生成行号而不是字节偏移量,但您需要将输入格式配置为从 isSplittable 方法返回 false(多个映射器不会处理大型输入文件)。如果您有小文件,或者文件大小接近 HDFS 块大小的文件,那么这不是问题。
-
您还可以使用 pig 来清理您的数据并获取那些特定的相关行并处理该特定数据。
我觉得这是Hadoop的一个缺点,当你想在不同的系统之间共享全局状态时,Hadoop会失败。
我会尝试在第一个MapReduce作业中添加这些行号。然后,您可以执行MapReduce作业,包括在Mapper中包括一些负责检查行号的代码,以便丢弃整行或执行分析。
编辑:我现在认为第一个 MR 作业无法实现,因为映射器的问题将与原始问题相同:它们将收到拆分,而根本没有关于它在整个大文件中的位置的参考。