在我的MapReduce程序的Reduce阶段,我执行的唯一操作是连接提供的迭代器中的每个值,如下所示:
public void reduce(Text key, Iterator<text> values,
OutputCollector<Text, Text> output, Reporter reporter) {
Text next;
Text outKey = new Text()
Text outVal = new Text();
StringBuilder sb = new StringBuilder();
while(values.hasNext()) {
next = values.next();
sb.append(next.toString());
if (values.hasNext())
sb.append(',');
}
outKey.set(key.toString());
outVal.set(sb.toSTring());
output.collect(outKey,outVal);
}
我的问题是,一些reduce输出值是巨大的文本行;如此之大,以至于即使初始大小非常大,字符串缓冲区也必须将其大小增加(加倍)数倍以容纳迭代器的所有上下文,从而导致内存问题。
在传统的 Java 应用程序中,这表明对文件的缓冲写入将是写入输出的首选方法。 如何在Hadoop中处理非常大的输出键值对? 我应该将结果直接流式传输到 HDFS 上的文件(每个减少调用一个文件)吗? 有没有办法缓冲输出,而不是output.collect方法?
注意:我已经最大限度地增加了内存/堆大小。 此外,一些消息来源表明,增加化简器的数量可以帮助解决内存/堆问题,但这里的问题直接可以追溯到在扩展其容量时使用 SringBuilder。
谢谢
并不是说我明白为什么你想要一个巨大的价值,但有一种方法可以做到这一点。
如果编写自己的 OutputFormat,则可以修复 RecordWriter.write(Key, Value)
方法的行为,以根据键值是否为 null 来处理值串联。
这样,在化简器中,您可以按如下方式编写代码(键的第一个输出是实际键,之后的所有内容都是空键:
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) {
boolean firstKey = true;
for (Text value : values) {
output.collect(firstKey ? key : null, value);
firstKey = false;
}
}
然后,实际RecordWriter.write()
具有以下逻辑来处理空键/值串联逻辑:
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
// if we've written data before, append a new line
if (dataWritten) {
out.write(newline);
}
// write out the key and separator
writeObject(key);
out.write(keyValueSeparator);
} else if (!nullValue) {
// write out the value delimiter
out.write(valueDelimiter);
}
// write out the value
writeObject(value);
// track that we've written some data
dataWritten = true;
}
public synchronized void close(Reporter reporter) throws IOException {
// if we've written out any data, append a closing newline
if (dataWritten) {
out.write(newline);
}
out.close();
}
您会注意到 close 方法也已修改为在写出的最后一条记录中写入尾随换行符
完整的代码清单可以在 pastebin 上找到,下面是测试输出:
key1 value1
key2 value1,value2,value3
key3 value1,value2
如果单个输出键值可以大于内存,则意味着标准输出机制不适合 - 因为通过 inerface 设计,它需要传递键值对而不是流。
我认为最简单的解决方案是将输出直接流式传输到 HDFS 文件。
如果您有理由通过输出格式传递数据 - 我建议以下解决方案:a) 写信给当地的临时目录
b) 将文件名作为输出格式的值传递。
可能最有效但有点复杂的解决方案是使用内存映射文件作为缓冲区。只要有足够的内存,它就会在内存中,并且在需要时,操作系统将关心有效地溢出到磁盘。