如何解决从IntWritable到Variation的铸造错误?映射减少HBase



当我试图将IntWritable从映射器传递到reducer时,出现以下错误:

INFO mapreduce.Job: Task Id : attempt_1413976354988_0009_r_000000_1, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.hbase.client.Mutation

这是我的映射器:

public class testMapper extends TableMapper<Object, Object>
{
    public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException
    {
        try
        {
            // get rowKey and convert it to string
            String inKey = new String(rowKey.get());
            // set new key having only date
            String oKey = inKey.split("#")[0];
            // get sales column in byte format first and then convert it to
            // string (as it is stored as string from hbase shell)
            byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
            String sSales = new String(bSales);
            Integer sales = new Integer(sSales);
            // emit date and sales values
            context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));
        }

这是减速器:

public class testReducer extends TableReducer<Object, Object, Object>
{
    public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        try
        {
            int sum = 0;
            // loop through different sales vales and add it to sum
            for (IntWritable sales : values)
            {
                Integer intSales = new Integer(sales.toString());
                sum += intSales;
            }
            // create hbase put with rowkey as date
            Put insHBase = new Put(key.get());
            // insert sum value to hbase
            insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
            // write data to Hbase table
            context.write(null, insHBase);

驱动程序:

public class testDriver
{
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        // define scan and define column families to scan
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("cf1"));
        Job job = Job.getInstance(conf);
        job.setJarByClass(testDriver.class);
        // define input hbase table
        TableMapReduceUtil.initTableMapperJob("test1", scan, testMapper.class, ImmutableBytesWritable.class, IntWritable.class, job);
        // define output table
        TableMapReduceUtil.initTableReducerJob("test2", testReducer.class, job);
        job.waitForCompletion(true);
    }
}

context.write(null,insHBase);

问题是,您正在将Put-out写入上下文,而hbase正期待一个IntWritable。

您应该将输出写入上下文,并让Hbase负责存储它们。Hase希望存储一个IntWritable,但你给了它一个Put操作,它扩展了突变。

Hbase的工作流程是,您将配置在配置中放置输出的位置,然后简单地将输出写入上下文。你不应该在减速器中进行任何手动Put操作。

相关内容

  • 没有找到相关文章

最新更新