Hadoop - 从记录阅读器到映射功能的多个文件



我已经实现了一个自定义的合并文件输入格式,以便为由文件组组成的地图任务创建拆分。我创建了一个解决方案,该解决方案通过记录阅读器传递拆分的每个文件,一切都很好。现在我正在尝试将整套文件传递给地图函数。

这是我的记录阅读器代码:

public class MultiImagesRecordReader extends
        RecordReader<Text[], BytesWritable[]> {
private long start = 0;
private long end = 0;
private int pos = 0;
private BytesWritable[] value;
private Text key[];
private CombineFileSplit split;
private Configuration conf;
private FileSystem fs;
private static boolean recordsRead;
public MultiImagesRecordReader(CombineFileSplit split,
        TaskAttemptContext context, Integer index) throws IOException {
    this.split = split;
    this.conf = context.getConfiguration();
}
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
        throws IOException, InterruptedException {
    start = split.getOffset(0);
    end = start + split.getLength();
    recordsRead = false;
    this.pos = (int) start;
    fs = FileSystem.get(conf);
    value = new BytesWritable[split.getNumPaths()];
    key = new Text[split.getNumPaths()];
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (recordsRead == true) {
        System.out.println("Sono nel next true"+InetAddress.getLocalHost());
        return false;
    } else {
        recordsRead = true;
        System.out.println("Sono nel next false"+InetAddress.getLocalHost());
        for (int i = 0; i < split.getNumPaths(); i++) {
            int fileLength = (int) split.getLength(i);
            Path path = split.getPath(i);
            byte[] result = new byte[fileLength];
            FSDataInputStream in = null;
            String file_path = path.toString();
            key[i] = new Text(file_path);
            try {
                in = fs.open(path);
                IOUtils.readFully(in, result, 0, fileLength);
            } finally {
                IOUtils.closeStream(in);
            }
            value[i] = new BytesWritable(result);
        }
        return true;
    }
}

使用此代码,map函数会正确接收键和值的向量,但会重复。我的意思是,我期望 map 函数被调用一次,而是被调用多次。我做错了什么?

我想你知道你的读者从currentKey()返回的每条记录都会调用Mapper map()currentValue()直到给定Split中的所有键值对都完成。我知道您的 map 函数被重复调用为同一个键值对(应该为单个键值对调用一次)。这意味着您的记录读取器重复读取相同的记录(键值对)。我还实现了自定义组合文件输入格式和记录读取器。您可以在此处查看它们的通用表单,并在同一项目中在此处查看实现

最新更新