还原前映射后出现IOException



我有一个数据集,每个记录由2个字段组成:

  • URL(无前缀)
  • 使用寿命(秒)

我想计算每个域的平均生存期(以天为单位)。例如,如果我有这样两个记录:

hadoop.apache.org/docs/current 22118400
hadoop.apache.org/docs/current/api/org/ 27820800

我应该收到答案:

hadoop.apache.org 289

为了这些计算,我写了一个hadoop作业:

package ru.bdata.siteslifes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import ru.bdata.siteslifes.arrays.IntArrayWritable;
import java.io.IOException;
import java.util.Iterator;
    public static class DomainMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
        @Override
        public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException{
            String url = key.toString();
            context.write(new Text(url.substring(0, url.indexOf('/'))), value);
        }
    }
    public static class AvgCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
        private static final int SEC_IN_DAY = 86400;
        @Override
        public void reduce(Text key, Iterable<IntWritable> value, Context context)
                throws IOException,InterruptedException{
            float sum = 0;
            int cnt = 0;
            Iterator<IntWritable> it = value.iterator();
            while (it.hasNext()){
                sum += it.next().get();
                cnt++;
            }
            context.write(key, new IntWritable(Math.round(sum / (cnt * SEC_IN_DAY))));
        }
    }
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Job job = new Job(conf);
        job.setJarByClass(AvgSiteLifeCounter.class);
        job.setMapperClass(DomainMapper.class);
        job.setCombinerClass(AvgCombiner.class);
        job.setReducerClass(Reducer.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(8);
        SequenceFileInputFormat.addInputPath(job, new Path(strings[0]));
        SequenceFileOutputFormat.setOutputPath(job, new Path(strings[1]));
        return job.waitForCompletion(true)? 0: 1;
    }
}

当我在集群上执行程序时,映射部分工作得很好,但在reducer部分启动之前,我看到一个异常:

java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1305)
    at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:74)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:551)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:99)
    at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:144)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.ja...

我应该如何更改代码以使reduce部分也能工作?

由于此作业只是hadoop任务的一部分,因此输入和输出数据表示为二进制文件(SequenceFile)。

附言:正如你所看到的,我不使用LongWritable。仅IntWritable。但在Exception的日志中,我看到了LongWritable

映射阶段之后Hadoop将结果写入临时文件,之后reducer读取此数据。在run()方法中,没有数据的键和值设置,必须从临时文件中读取。因此,在为mapper的结果设置键和值之后,我的代码就可以工作了。例如,我在run()方法中添加了这样的字符串:

conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class); 

当驱动程序代码设置的一个作业与运行时实际接收的类型之间确实存在类型不匹配时,Hadoop会给出IO异常,这有点令人困惑。

相关内容

  • 没有找到相关文章

最新更新