当我试图将IntWritable
从映射器传递到reducer时,出现以下错误:
INFO mapreduce.Job: Task Id : attempt_1413976354988_0009_r_000000_1, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.hbase.client.Mutation
这是我的映射器:
public class testMapper extends TableMapper<Object, Object>
{
public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException
{
try
{
// get rowKey and convert it to string
String inKey = new String(rowKey.get());
// set new key having only date
String oKey = inKey.split("#")[0];
// get sales column in byte format first and then convert it to
// string (as it is stored as string from hbase shell)
byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
String sSales = new String(bSales);
Integer sales = new Integer(sSales);
// emit date and sales values
context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));
}
这是减速器:
public class testReducer extends TableReducer<Object, Object, Object>
{
public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
try
{
int sum = 0;
// loop through different sales vales and add it to sum
for (IntWritable sales : values)
{
Integer intSales = new Integer(sales.toString());
sum += intSales;
}
// create hbase put with rowkey as date
Put insHBase = new Put(key.get());
// insert sum value to hbase
insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
// write data to Hbase table
context.write(null, insHBase);
驱动程序:
public class testDriver
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
// define scan and define column families to scan
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
Job job = Job.getInstance(conf);
job.setJarByClass(testDriver.class);
// define input hbase table
TableMapReduceUtil.initTableMapperJob("test1", scan, testMapper.class, ImmutableBytesWritable.class, IntWritable.class, job);
// define output table
TableMapReduceUtil.initTableReducerJob("test2", testReducer.class, job);
job.waitForCompletion(true);
}
}
context.write(null,insHBase);
问题是,您正在将Put-out写入上下文,而hbase正期待一个IntWritable。
您应该将输出写入上下文,并让Hbase负责存储它们。Hase希望存储一个IntWritable,但你给了它一个Put操作,它扩展了突变。
Hbase的工作流程是,您将配置在配置中放置输出的位置,然后简单地将输出写入上下文。你不应该在减速器中进行任何手动Put操作。