Mapreduce作业正在运行,但有一个例外



这是我的代码:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySort extends Configured implements Tool{
public static void main(String[] args) {
    try {
        ToolRunner.run(new Configuration(), new SecondarySort(), args);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
static class KeyPartitioner implements Partitioner<StockKey, DoubleWritable> {
    @Override
    public int getPartition(StockKey arg0, DoubleWritable arg1, int arg2) {
        int partition = arg0.name.hashCode() % arg2;
        return partition;
    }
    @Override
    public void configure(JobConf job) {
    }
}

static class StockKey implements WritableComparable<StockKey> {
    String name;
    Long timestamp;
    public StockKey() {
    }
    StockKey(String name, Long timestamp){
        this.name = name;
        this.timestamp = timestamp;
    }
    @Override
    public void readFields(DataInput arg0) throws IOException {
        name = WritableUtils.readString(arg0);
        timestamp = arg0.readLong();
    }
    @Override
    public void write(DataOutput arg0) throws IOException {
        WritableUtils.writeString(arg0, name);
        arg0.writeLong(timestamp);
    }
    @Override
    public int compareTo(StockKey arg0) {
        int result = 0;
        result = name.compareToIgnoreCase(arg0.name);
        if(result == 0)
            result = timestamp.compareTo(arg0.timestamp);   
        return result;
    }
    public String toString() {
        String outputString = name+","+timestamp;
        return outputString;
    }
}

static class StockReducer implements Reducer<StockKey, DoubleWritable, Text, Text>{
    public void reduce(StockKey key, Iterator<DoubleWritable> value, Outp      
OutputCollector<Text, Text> context, Reporter reporter) 
                throws IOException {
        Text k = new Text(key.toString());
        while(value.hasNext()) {
            Double v = value.next().get();
            Text t = new Text(v.toString());
            context.collect(k, t);          
        }
    }
    @Override
    public void configure(JobConf job) {
        // TODO Auto-generated method stub
    }
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
    }
}

static class StockMapper implements Mapper<LongWritable, Text, StockKey, 
DoubleWritable> {
    public void map(LongWritable offset, Text value, OutputCollector<StockKey, 
DoubleWritable> context, Reporter reporter) 
                throws IOException {
        String[] values = value.toString().split(",");
        StockKey key = new StockKey(values[0].trim(), 
Long.parseLong(values[1].trim()));
        DoubleWritable val = new 
DoubleWritable(Double.parseDouble(values[2].trim()));

            context.collect(key, val);
    }
    @Override
    public void configure(JobConf job) {
        // TODO Auto-generated method stub
    }
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
    }
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] arg) throws Exception {
    JobConf conf = new JobConf(getConf(), SecondarySort.class);
    conf.setJobName(SecondarySort.class.getName());
    conf.setJarByClass(SecondarySort.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    conf.setMapOutputKeyClass(StockKey.class);
    conf.setMapOutputValueClass(Text.class);
    conf.setPartitionerClass((Class<? extends Partitioner<StockKey, 
DoubleWritable>>) KeyPartitioner.class);
    conf.setMapperClass((Class<? extends Mapper<LongWritable, Text, StockKey, 
DoubleWritable>>) StockMapper.class);
    conf.setReducerClass((Class<? extends Reducer<StockKey, DoubleWritable, 
Text, Text>>) StockReducer.class);
    FileInputFormat.addInputPath(conf, new Path(arg[0]));
    FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
    JobClient.runJob(conf);
    return 0;
}
}

这里有一个例外:

java.io.IOException: Type mismatch in value from map: expected 
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
    at SecondarySort$StockMapper.map(SecondarySort.java:135)
    at SecondarySort$StockMapper.map(SecondarySort.java:1)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at     
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
    at org.apache.hadoop.mapred.Child.main(Child.java:264)

12/07/13 03:22:32 INFO mapred.JobClient: Task Id :   
attempt_201207130314_0002_m_000001_2, Status : FAILED
java.io.IOException: Type mismatch in value from map: expected 
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
    at SecondarySort$StockMapper.map(SecondarySort.java:135)
    at SecondarySort$StockMapper.map(SecondarySort.java:1)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
    at org.apache.hadoop.mapred.Child.main(Child.java:264)

此代码可能存在许多潜在问题:

  • StockKey-您应该覆盖默认的hashCode()方法-此时,具有相同内容的两个StockKey将具有不同的hashCode值(如果您不覆盖JVM默认值,那么它将返回一个数字,该数字在所有范围内都是两个对象的内存地址)。我知道在您的partitioner中,您只使用name字段(这是一个String,并且将具有hashCode()的有效实现,但这是一种很好的做法,以防将来您使用整个Stock对象的hashCode(),并想知道为什么两个相同的Stock对象最终会出现在不同的还原

  • KeyPartitioner-您需要Math.abs(..) arg0.name.hashCode()的结果。目前,这个值可能会返回负数,当你用减少数取模时,会返回负数。连锁效应是MR框架将抛出一个异常,因为它期望一个介于0(包括0)和减少数(不包括0)之间的数字。这可能是你的问题所在,我将在下一点中解释

  • Mapper.map方法-当您调用context.collect时,您正在吞下任何潜在的输出异常。继续我前面关于partitioner的观点——如果它返回一个负数,就会抛出一个异常,您需要处理这个异常。在某些情况下,捕获和吞咽异常可能是可以的(例如,输入记录的数据验证),但输出时发生的任何异常都应该被抛出到MR框架,以标记出问题,并且该映射器的输出是错误/不完整的:

    try {
        context.collect(key, val);
    } catch (IOException e) {
        e.printStackTrace();
    }
    
  • 最后,您需要显式声明映射并减少输出类型(这导致了异常,因为您当前将映射值输出类型声明为Text,而实际上映射器正在输出DoubleWritable):

job.setMapOutputKeyClass(StockKey.class);job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);

我建议您删除context.collect调用周围的try/catch块,然后重新运行作业(或者只检查映射任务的日志,看看是否看到堆栈跟踪)。

相关内容

  • 没有找到相关文章

最新更新