使用map reduce在cassandra中执行批量加载



我没有太多使用cassandra的经验,所以如果我采用了错误的方法,请原谅。

我正试图用地图减少在卡桑德拉进行批量装载

基本上单词计数示例

参考:http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

我放了一个简单的Hadoop Wordcount Mapper示例,并根据上面的示例对驱动程序代码和reducer进行了轻微的修改。

我也成功地生成了输出文件。现在我的疑问是如何执行加载到卡桑德拉的部分?我的方法有什么不同吗

请提供建议。

这是驱动程序代码的一部分

 Job job = new Job();
 job.setJobName(getClass().getName());
 job.setJarByClass(CassaWordCountJob.class);
 Configuration conf = job.getConfiguration();
 conf.set("cassandra.output.keyspace", "test");
 conf.set("cassandra.output.columnfamily", "words");
 conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
 conf.set("cassandra.output.thrift.port","9160");    // default
 conf.set("cassandra.output.thrift.address", "localhost");
 conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");
 job.setMapperClass(CassaWordCountMapper.class);
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(IntWritable.class);
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 job.setReducerClass(CassaWordCountReducer.class);
 FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra")); 
 MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
 return job.waitForCompletion(true) ? 0 : 1;

映射器与普通的wordcount映射器相同,它只是标记并发出Word,1

减速器等级采用形式

public class CassaWordCountReducer extends 
        Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List<Mutation> columnsToAdd = new ArrayList<Mutation>();
        Integer wordCount = 0;
        for(IntWritable value : values) {
            wordCount += value.get();
        }
        Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
        countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
        countCol.setTimestamp(new Date().getTime());
        ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
        wordCosc.setColumn(countCol);
        Mutation countMut = new Mutation();
        countMut.column_or_supercolumn = wordCosc;
        columnsToAdd.add(countMut);
        context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
    }
}

要批量加载到Cassandra中,我建议从DataStax中查看这篇文章。基本上你需要做2件事批量装载:

  • 您的输出数据无法原生地放入Cassandra,您需要将其转换为SSTables
  • 一旦你有了SSTables,你就需要能够将它们流式传输到Cassandra中。当然,您不只是想将每个SSTable复制到每个节点,而是只想将数据的相关部分复制到每个结点

在您使用BulkOutputFormat的情况下,它应该像在幕后使用sstableloader一样完成所有这些操作。我从未将它与MultipleOutputs一起使用过,但它应该可以正常工作。

我认为这种情况下的错误是您没有正确使用MultipleOutputs:您仍然在执行context.write,而实际上您应该向MultipleOutputs对象写入。按照您现在的操作方式,由于您正在向常规Context进行写入,它将由默认的TextOutputFormat输出格式接收,而不是您在MultipleOutputs中定义的格式。关于如何在减速器中使用MultipleOutputs的更多信息,请点击此处。

一旦您按照您定义的BulkOutputFormat的正确输出格式写入,您的SSTables应该会被创建,并从集群中的每个节点流式传输到Cassandra——您不需要任何额外的步骤,输出格式会为您处理它。

此外,我建议你看一下这篇文章,他们还解释了如何使用BulkOutputFormat,但他们使用的是ConfigHelper,你可能想看看它,以更容易地配置你的Cassandra端点。

相关内容

  • 没有找到相关文章

最新更新