有这样一个示例记录,100年,1:2:3
我想标准化为
100年,1
100年,2
100年,3
我的一个同事写了一个pig脚本来实现这一点,而我的MapReduce代码花了更多的时间。我之前使用的是默认的TextInputformat。但是为了提高性能,我决定编写一个带有自定义RecordReader的自定义Input格式类。以LineRecordReader类为参考,我尝试编写以下代码:
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
import com.normalize.util.Splitter;
public class NormalRecordReader extends RecordReader<Text, Text> {
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private Text line = null;
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
in = new LineReader(fileIn, job);
this.pos = start;
}
public boolean nextKeyValue() throws IOException {
int newSize = 0;
if (line == null) {
line = new Text();
}
while (pos < end) {
newSize = in.readLine(line);
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
System.out.println("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
Splitter splitter = new Splitter(line.toString(), ",");
List<String> split = splitter.split();
if (key == null) {
key = new Text();
}
key.set(split.get(0));
if (value == null) {
value = new Text();
}
value.set(split.get(1));
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
虽然这工作,但我没有看到任何性能提高。这里我在","处打破记录,并将100设置为键,将1,2,3设置为值。我只调用做以下事情的映射器:
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
try {
Splitter splitter = new Splitter(value.toString(), ":");
List<String> splits = splitter.split();
for (String split : splits) {
context.write(key, new Text(split));
}
} catch (IndexOutOfBoundsException ibe) {
System.err.println(value + " is malformed.");
}
}
splitter类用于拆分数据,因为我发现String的拆分器速度较慢。方法是:
public List<String> split() {
List<String> splitData = new ArrayList<String>();
int beginIndex = 0, endIndex = 0;
while(true) {
endIndex = dataToSplit.indexOf(delim, beginIndex);
if(endIndex == -1) {
splitData.add(dataToSplit.substring(beginIndex));
break;
}
splitData.add(dataToSplit.substring(beginIndex, endIndex));
beginIndex = endIndex + delimLength;
}
return splitData;
}
代码可以以任何方式改进吗?
让我在这里总结一下我认为你可以改进的地方,而不是在评论中:
-
如前所述,当前您为每条记录创建
Text
对象数次(次数将等于您的令牌数量)。虽然对于小的投入来说,这可能无关紧要,但对于规模较大的工作来说,这可能是一个大问题。要解决这个问题,请执行以下操作:private final Text text = new Text(); public void map(Text key, Text value, Context context) { .... for (String split : splits) { text.set(split); context.write(key, text); } }
-
对于你的拆分,你现在所做的是为每条记录分配一个新的数组,填充这个数组,然后迭代这个数组来写你的输出。实际上,在这种情况下,你并不需要一个数组,因为你不需要维护任何状态。使用您提供的
split
方法的实现,您只需要传递一次数据:public void map(Text key, Text value, Context context) { String dataToSplit = value.toString(); String delim = ":"; int beginIndex = 0; int endIndex = 0; while(true) { endIndex = dataToSplit.indexOf(delim, beginIndex); if(endIndex == -1) { text.set(dataToSplit.substring(beginIndex)); context.write(key, text); break; } text.set(dataToSplit.substring(beginIndex, endIndex)); context.write(key, text); beginIndex = endIndex + delim.length(); } }
-
我真的不明白为什么你写自己的
InputFormat
,似乎KeyValueTextInputFormat
正是你所需要的,可能已经优化了。下面是它的用法:conf.set("key.value.separator.in.input.line", ","); job.setInputFormatClass(KeyValueTextInputFormat.class);
-
根据您的示例,每个记录的键似乎是一个整数。如果总是这样,那么使用
Text
作为您的映射器输入键不是最佳的,它应该是IntWritable
,甚至可能是ByteWritable
,这取决于您的数据。 -
类似地,你想要使用
IntWritable
或ByteWritable
作为你的映射器输出键和输出值。
同样,如果你想要一些有意义的基准,你应该在一个更大的数据集上测试,比如几个gb。1分钟的测试并没有真正的意义,特别是在分布式系统的环境中。
当输入量小时,一个作业可能比另一个作业运行得快,但当输入量大时,这种趋势可能会逆转。话是这么说的,你也应该知道Pig在转换到Map/Reduce时做了很多优化,所以我并不太惊讶它比你的Java Map/Reduce代码运行得快,我过去也看到过这种情况。尝试我建议的优化,如果它仍然不够快,这里有一个关于分析你的Map/Reduce作业的链接,其中有一些更有用的技巧(特别是关于分析的技巧7,我发现它很有用)。