Cassandra Map Reduce for TimeUUID columns



我最近设置了4个节点的Cassandra集群,用于使用一个列族进行学习,该列族将时间序列数据保存为.

Key->{列名:timeUUID,列值:csv日志行,ttl:1year},我使用Netflix Astyanax java客户端加载了大约100万条日志行。

我还将Hadoop配置为使用1个namenode和4个datanode运行map reduce作业,以对Cassandra数据运行一些分析。

互联网上所有可用的例子都使用列名作为Hadoop作业配置的SlicePredicate,其中,由于我有timeUUID作为列,我如何有效地将Cassandra数据一次提供1000列的批次到Hadoop作业设置器。

此测试数据中某些行的列数超过10000,实际数据中的列数预计会更多。


我将我的作业配置为

public int run(String[] arg0) throws Exception {
Job job = new Job(getConf(), JOB_NAME);
Job.setJarByClass(LogTypeCounterByDate.class);
job.setMapperClass(LogTypeCounterByDateMapper.class);       
job.setReducerClass(LogTypeCounterByDateReducer.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
ConfigHelper.setRangeBatchSize(getConf(), 1000);

SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
ByteBuffer.wrap(new byte[0]), true, 1000);
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);

ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADRESS);
ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}

但我无法理解我是如何定义Mapper的,请您为Mapper类提供模板。

public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
{
private Text key = null;
private LongWritable value = null;
@Override
protected void setup(Context context){
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context){
//String[] lines = columns.;
}
}
ConfigHelper.setRangeBatchSize(getConf(), 1000)
...
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(TimeUUID.asByteBuffer(startValue), TimeUUID.asByteBuffer(endValue), false, 1000))
ConfigHelper.setInputSlicePredicate(conf, predicate)

最新更新