我是Hadoop的新手,遇到了以下问题。我试图做的是将数据库的一个碎片映射到一个映射器(请不要问我为什么需要这样做等),然后对这些数据进行某些操作,将结果输出到reducers,然后再次使用该输出,使用相同的碎片格式对同一数据进行第二阶段的映射/减少工作。Hadoop不提供任何输入方法来发送数据库的碎片。只能使用LineInputFormat
和LineRecordReader
逐行发送。NLineInputFormat
在这种情况下也没有帮助。我需要扩展FileInputFormat
和RecordReader
类来编写我自己的InputFormat
。建议我使用LineRecordReader
,因为底层代码已经处理了FileSplits
以及与拆分文件相关的所有问题。我现在所需要做的就是重写nextKeyValue()
方法,我不知道该怎么做。
for(int i=0;i<shard_size;i++){
if(lineRecordReader.nextKeyValue()){
lineValue.append(lineRecordReader.getCurrentValue().getBytes(),0,lineRecordReader.getCurrentValue().getLength());
}
}
上面的代码片段是编写的,但不知何故工作不好。
我建议在输入文件中放入连接字符串和一些其他指示,以便在哪里找到碎片
映射器将获取这些信息,连接到数据库并完成一项工作。我不建议将结果集转换为hadoop的可写类——这会阻碍性能
我认为需要解决的问题是,对这个相对较小的输入进行足够的拆分。你可以简单地创建足够多的小文件,每个文件都有几个碎片引用,或者你可以调整输入格式来构建小的分割。第二种方式将更加灵活。
我所做的事情是这样的。我编写了自己的记录读取器,一次读取n行,并将它们作为输入发送给映射器
public boolean nextKeyValue() throws IOException,
中断异常{
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 5; i++) {
if (!lineRecordReader.nextKeyValue()) {
return false;
}
lineKey = lineRecordReader.getCurrentKey();
lineValue = lineRecordReader.getCurrentValue();
sb.append(lineValue.toString());
sb.append(eol);
}
lineValue.set(sb.toString());
//System.out.println(lineValue.toString());
return true;
// throw new UnsupportedOperationException("Not supported yet.");
}
如何精简