我是Hadoop的新手。我使用hadoop 2.3.0和hbase 0.98.3。我试图通过使用MapReduce从文本文件中提取数据并写入hadoop中的hbase表。虽然我设置了outputKeyClass和outputValueClass的工作,我得到classCastException。有人能帮我吗?
这是我的代码。
public static void main(String[] args) {
Configuration config = HBaseConfiguration.create();
Job job;
try {
job = new Job(config, "LogBulkLoader");
job.setJarByClass(Main.class);
job.setMapperClass(LogMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "fatih");
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, new Path(userActionsTestFile));
job.setNumReduceTasks(0);
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class LogMapper extends
TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
}
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
try {
String[] l = value.toString().split(",");
String[] t = l[4].split(" ");
String[] date = t[0].split("-");
String[] time = t[1].split(":");
GregorianCalendar gc = new GregorianCalendar(
Integer.parseInt(date[0]), Integer.parseInt(date[1]),
Integer.parseInt(date[2]), Integer.parseInt(time[0]),
Integer.parseInt(time[1]), Integer.parseInt(time[2]));
Put put = new Put(Bytes.toBytes(l[0]));
put.add(Bytes.toBytes("song"), Bytes.toBytes(l[1]),
gc.getTimeInMillis(), Bytes.toBytes(l[6]));
put.add(Bytes.toBytes("album"), Bytes.toBytes(l[1]),
gc.getTimeInMillis(), Bytes.toBytes(l[5]));
put.add(Bytes.toBytes("album"), Bytes.toBytes(l[2]),
gc.getTimeInMillis(), Bytes.toBytes(l[5]));
put.add(Bytes.toBytes("singer"), Bytes.toBytes(l[1]),
gc.getTimeInMillis(), Bytes.toBytes(l[5]));
put.add(Bytes.toBytes("singer"), Bytes.toBytes(l[2]),
gc.getTimeInMillis(), Bytes.toBytes(l[5]));
put.add(Bytes.toBytes("singer"), Bytes.toBytes(l[3]),
gc.getTimeInMillis(), Bytes.toBytes(l[5]));
context.write(new ImmutableBytesWritable(l[0].getBytes()), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
得到如下异常:
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:403)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable
at com.argedor.module1.Main$LogMapper.map(Main.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
添加以下代码
job.setMapOutputKeyClass(ImmutableBytesWritable.class)
job.setMapOutputValueClass(Put.class)