使用多线程映射器时出错



我遇到了与此问题中提到的相同的问题(用多线程映射器替换映射器时,在map中键入不匹配的键),但答案对我不起作用。

我收到的错误消息如下所示:

13/09/17 10:37:38 INFO mapred.JobClient: Task Id : attempt_201309170943_0006_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

这是我的主要方法:

public static int main(String[] init_args) throws Exception {
    Configuration config = new Configuration();
    if (args.length != 5) {
        System.out.println("Invalid Arguments");
        print_usage();
        throw new IllegalArgumentException();
    }
    config.set("myfirstdata", args[0]);
    config.set("myseconddata", args[1]);
    config.set("mythirddata", args[2]);
    config.set("mykeyattribute", "GK");
    config.setInt("myy", 50);
    config.setInt("myx", 49);
    // additional attributes
    config.setInt("myobjectid", 1);
    config.setInt("myplz", 3);
    config.setInt("mygenm", 4);
    config.setInt("mystnm", 6);
    config.setInt("myhsnr", 7);
    config.set("mapred.textoutputformat.separator", ";");
    Job job = new Job(config);
    job.setJobName("MySample");
    // set the outputs for the Job
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    // set the outputs for the Job
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    MultithreadedMapper.setMapperClass(job, MyMapper.class);
    job.setReducerClass(MyReducer.class);
    // In our case, the combiner is the same as the reducer. This is
    // possible
    // for reducers that are both commutative and associative
    job.setCombinerClass(MyReducer.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    TextInputFormat.setInputPaths(job, new Path(args[3]));
    TextOutputFormat.setOutputPath(job, new Path(args[4]));
    job.setJarByClass(MySampleDriver.class);
    MultithreadedMapper.setNumberOfThreads(job, 2);
    return job.waitForCompletion(true) ? 0 : 1;
}

映射器代码如下所示:

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
...
/**
 * Sets up mapper with filter geometry provided as argument[0] to the jar
 */
@Override
public void setup(Context context) {
    ...
}
@Override
public void map(LongWritable key, Text val, Context context)
        throws IOException, InterruptedException {
    ...
    // We know that the first line of the CSV is just headers, so at byte
    // offset 0 we can just return
    if (key.get() == 0)
        return;
    String line = val.toString();
    String[] values = line.split(";");
    float latitude = Float.parseFloat(values[latitudeIndex]);
    float longitude = Float.parseFloat(values[longitudeIndex]);
    ...
    // Create our Point directly from longitude and latitude
    Point point = new Point(longitude, latitude);
    IntWritable one = new IntWritable();

    if (...) {
        int name = ...
        one.set(name);
        String out = ...
        context.write(new Text(out), one);
    } else {
        String out = ...
        context.write(new Text(out), new IntWritable(-1));
    }
}
}

您忘了设置映射器类。您需要在代码中添加job.setMapperClass(MultithreadedMapper.class);

最新更新