将值从一个hbase表传输到另一个时出错
INFO mapreduce。作业:任务Id:attempt_1410946588060_0019_r_0000000_2,状态:失败错误:java.lang.ClassCastException:org.apache.hadoop.hbase.client.Result无法强制转换为org.apache.haoop.hbase.chlient.Mutation网址:org.apache.hoop.hbase.mapdreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:87)网址:org.apache.hadop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:576)网址:org.apache.hadop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContext Impl.java:89)网址:org.apache.hadop.mapreduce.libreduce.WrappedEduer$Context.write(WrappedReducer.java:105)网址:org.apache.hadop.mapreduce.Reporter.reduce(Reducer.java:150)网址:org.apache.hadop.mapreduce.Reporter.run(Reducer.java:171)网址:org.apache.hadop.mapred.ReduceTask.runNewReducer(ReduceTask.java:645)网址:org.apache.hadop.mapred.ReduceTask.run(ReduceTask.java:405)网址:org.apache.hadop.mapred.YarnChild$2.run(YarnChild.java:162)位于java.security.AccessController.doPrivileged(本机方法)位于javax.security.auth.Subject.doAs(Subject.java:396)网址:org.apache.hadop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)网址:org.apache.hadop.mapred.YarnChild.main(YarnChild.java:157)
我的驾驶员等级:
Configuration conf = HBaseConfiguration.create();
// define scan and define column families to scan
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
// Job job = new Job(conf,"ExampleSummary");
Job job =Job.getInstance(conf);
job.setJarByClass(HBaseDriver.class);
//
// define input hbase tableS
TableMapReduceUtil.initTableMapperJob(
"test1",
scan,
HBaseMapper.class,
ImmutableBytesWritable.class,
Result.class,
job);
// define output table
TableMapReduceUtil.initTableReducerJob(
"test2",
HBaseReducer.class,
job);
job.waitForCompletion(true);
我的映射器:
public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
throws IOException, InterruptedException {
try {
// get rowKey and convert it to string
String inKey = new String(rowKey.get());
// set new key having only date
String oKey = inKey.split("#")[0];
// get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
String sSales = new String(bSales);
Integer sales = new Integer(sSales);
// emit date and sales values
context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));
} catch (RuntimeException e){
e.printStackTrace();
}
我的减速器:
public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
try {
int sum = 0;
// loop through different sales vales and add it to sum
for (IntWritable sales : values) {
Integer intSales = new Integer(sales.toString());
sum += intSales;
}
// create hbase put with rowkey as date
Put insHBase = new Put(key.get());
// insert sum value to hbase
insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
// write data to Hbase table
context.write(null, insHBase);
} catch (Exception e) {
e.printStackTrace();
}
}
我找到了解决方案,只需要更改
这个:
TableMapReduceUtil.initTableMapperJob(
"test1",
scan,
HBaseMapper.class,
ImmutableBytesWritable.class,
Result.class,
job);
到此:
TableMapReduceUtil.initTableMapperJob(
"test1",
scan,
HBaseMapper.class,
ImmutableBytesWritable.class,
IntWritable.class,
job);