如何使用AVRO org.apache.avro.mapreduce接口编程



我所有的程序都是用hadoop的新MR1接口(org.apache.hadoop.mapreduce(编写的,所以我也想使用avro的新org.apache.avro.mapreduce。但它对我不起作用。

该程序接受 avro 数据的输入并输出相同的数据。我的程序背后的主要思想是针对avro包装的键/值对hadoop的Mapper和Reducer进行子类化。这是我的工作驱动程序的一个块:

    AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
    job.setMapperClass(MyAvroMap.class);
    job.setReducerClass(MyAvroReduce.class);
    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);
    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);

MyAvroMap 和 MyAvroReduce 子类的定义分别是

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
            AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                AvroKey<NetflowRecord>, NullWritable>{ ... }

蛋氨酸的NetflowRecord是我的avro记录类。我遇到了运行异常

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey

通过阅读hadoop和avro的源代码,我发现异常是由 JobConf 抛出的,以确保映射键是 WritableComparable 的一个子类,像这样(Hadoop1.2.1,759 行(

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));

但是 avro 表明 AvroKey 和 AvroValue 只是一个简单的包装器,没有子类化 Hadoop 的可写 * 接口。

我相信,即使没有测试,我也可以使用旧的映射接口,但这不是我想要的。你能给我一些关于使用纯org.apache.avro.mapreduce接口编程的例子或解释吗?

真诚地

杰明

经过艰苦的搜索,借助这个补丁 https://issues.apache.org/jira/browse/AVRO-593,我发现每个 AvroKey 和 AvroValue 包装器在作业配置中都必须有一个架构定义。这就是我错过的。

在这里,我有两个选择:

  1. 如果保持MyAvroMap和MyAvroReduce不变,我必须为CharSequence定义一个模式,并使用AvroJob为映射器输出声明此模式,例如

    AvroJob.setMapOutputKeySchema(job, <"defined-schema-for-charsequence">(;AvroJob.setMapOutputValueSchema(job, NetflowRecord.getClassSchema(((;

  2. 通过将映射器输出键/值更改为文本/AvroValue,我只需要为映射器输出值添加模式声明,例如

    job.setMapOutputKeyClass(Text.class(;AvroJob.setMapOutputValueSchema(job, NetflowRecord.getClassSchema(((;

使用mapreduce API,我们不再需要对AvroMapper和AvroReducer进行子类化。在这里,我在代码中实现了选项2,而无需其他架构定义。

杰明

相关内容

  • 没有找到相关文章

最新更新