使用MapReduce进行规范化



有这样一个示例记录,100年,1:2:3

我想标准化为
100年,1
100年,2
100年,3

我的一个同事写了一个pig脚本来实现这一点,而我的MapReduce代码花了更多的时间。我之前使用的是默认的TextInputformat。但是为了提高性能,我决定编写一个带有自定义RecordReader的自定义Input格式类。以LineRecordReader类为参考,我尝试编写以下代码:

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
import com.normalize.util.Splitter;
public class NormalRecordReader extends RecordReader<Text, Text> {
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private int maxLineLength;
    private Text key = null;
    private Text value = null;
    private Text line = null;
    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        in = new LineReader(fileIn, job);
        this.pos = start;
    }
    public boolean nextKeyValue() throws IOException {
        int newSize = 0;
        if (line == null) {
            line = new Text();
        }
        while (pos < end) {
            newSize = in.readLine(line);
            if (newSize == 0) {
                break;
            }
            pos += newSize;
            if (newSize < maxLineLength) {
                break;
            }
            // line too long. try again
            System.out.println("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
        Splitter splitter = new Splitter(line.toString(), ",");
        List<String> split = splitter.split();
        if (key == null) {
            key = new Text();
        }
        key.set(split.get(0));
        if (value == null) {
            value = new Text();
        }
        value.set(split.get(1));
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
    @Override
    public Text getCurrentKey() {
        return key;
    }
    @Override
    public Text getCurrentValue() {
        return value;
    }
    /**
     * Get the progress within the split
     */
    public float getProgress() {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }
    public synchronized void close() throws IOException {
        if (in != null) {
            in.close(); 
        }
    }
}

虽然这工作,但我没有看到任何性能提高。这里我在","处打破记录,并将100设置为键,将1,2,3设置为值。我只调用做以下事情的映射器:

public void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException {
    try {
        Splitter splitter = new Splitter(value.toString(), ":");
        List<String> splits = splitter.split();
        for (String split : splits) {
            context.write(key, new Text(split));
        }
    } catch (IndexOutOfBoundsException ibe) {
        System.err.println(value + " is malformed.");
    }
}

splitter类用于拆分数据,因为我发现String的拆分器速度较慢。方法是:

public List<String> split() {
    List<String> splitData = new ArrayList<String>();
    int beginIndex = 0, endIndex = 0;
    while(true) {
        endIndex = dataToSplit.indexOf(delim, beginIndex);
        if(endIndex == -1) {
            splitData.add(dataToSplit.substring(beginIndex));
            break;
        }
        splitData.add(dataToSplit.substring(beginIndex, endIndex));
        beginIndex = endIndex + delimLength;
    }
    return splitData;
}

代码可以以任何方式改进吗?

让我在这里总结一下我认为你可以改进的地方,而不是在评论中:

  • 如前所述,当前您为每条记录创建Text对象数次(次数将等于您的令牌数量)。虽然对于小的投入来说,这可能无关紧要,但对于规模较大的工作来说,这可能是一个大问题。要解决这个问题,请执行以下操作:

    private final Text text = new Text();
    public void map(Text key, Text value, Context context) {
        ....
        for (String split : splits) {
            text.set(split);
            context.write(key, text);
        }
    }
    
  • 对于你的拆分,你现在所做的是为每条记录分配一个新的数组,填充这个数组,然后迭代这个数组来写你的输出。实际上,在这种情况下,你并不需要一个数组,因为你不需要维护任何状态。使用您提供的split方法的实现,您只需要传递一次数据:

    public void map(Text key, Text value, Context context) {
        String dataToSplit = value.toString();
        String delim = ":";
        int beginIndex = 0;
        int endIndex = 0;
        while(true) {
            endIndex = dataToSplit.indexOf(delim, beginIndex);
            if(endIndex == -1) {
                text.set(dataToSplit.substring(beginIndex));
                context.write(key, text);
                break;
            }
            text.set(dataToSplit.substring(beginIndex, endIndex));
            context.write(key, text);
            beginIndex = endIndex + delim.length();
        }
    }
    
  • 我真的不明白为什么你写自己的InputFormat,似乎KeyValueTextInputFormat正是你所需要的,可能已经优化了。下面是它的用法:

    conf.set("key.value.separator.in.input.line", ",");
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    
  • 根据您的示例,每个记录的键似乎是一个整数。如果总是这样,那么使用Text作为您的映射器输入键不是最佳的,它应该是IntWritable,甚至可能是ByteWritable,这取决于您的数据。

  • 类似地,你想要使用IntWritableByteWritable作为你的映射器输出键和输出值。

同样,如果你想要一些有意义的基准,你应该在一个更大的数据集上测试,比如几个gb。1分钟的测试并没有真正的意义,特别是在分布式系统的环境中。

当输入量小时,一个作业可能比另一个作业运行得快,但当输入量大时,这种趋势可能会逆转。

话是这么说的,你也应该知道Pig在转换到Map/Reduce时做了很多优化,所以我并不太惊讶它比你的Java Map/Reduce代码运行得快,我过去也看到过这种情况。尝试我建议的优化,如果它仍然不够快,这里有一个关于分析你的Map/Reduce作业的链接,其中有一些更有用的技巧(特别是关于分析的技巧7,我发现它很有用)。

相关内容

  • 没有找到相关文章

最新更新