我没有太多使用cassandra的经验,所以如果我采用了错误的方法,请原谅。
我正试图用地图减少在卡桑德拉进行批量装载
基本上单词计数示例
参考:http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/
我放了一个简单的Hadoop Wordcount Mapper示例,并根据上面的示例对驱动程序代码和reducer进行了轻微的修改。
我也成功地生成了输出文件。现在我的疑问是如何执行加载到卡桑德拉的部分?我的方法有什么不同吗
请提供建议。
这是驱动程序代码的一部分
Job job = new Job();
job.setJobName(getClass().getName());
job.setJarByClass(CassaWordCountJob.class);
Configuration conf = job.getConfiguration();
conf.set("cassandra.output.keyspace", "test");
conf.set("cassandra.output.columnfamily", "words");
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160"); // default
conf.set("cassandra.output.thrift.address", "localhost");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");
job.setMapperClass(CassaWordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(CassaWordCountReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra"));
MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
return job.waitForCompletion(true) ? 0 : 1;
映射器与普通的wordcount映射器相同,它只是标记并发出Word,1
减速器等级采用形式
public class CassaWordCountReducer extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Mutation> columnsToAdd = new ArrayList<Mutation>();
Integer wordCount = 0;
for(IntWritable value : values) {
wordCount += value.get();
}
Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
countCol.setTimestamp(new Date().getTime());
ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
wordCosc.setColumn(countCol);
Mutation countMut = new Mutation();
countMut.column_or_supercolumn = wordCosc;
columnsToAdd.add(countMut);
context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
}
}
要批量加载到Cassandra中,我建议从DataStax中查看这篇文章。基本上你需要做2件事批量装载:
- 您的输出数据无法原生地放入Cassandra,您需要将其转换为SSTables
- 一旦你有了SSTables,你就需要能够将它们流式传输到Cassandra中。当然,您不只是想将每个SSTable复制到每个节点,而是只想将数据的相关部分复制到每个结点
在您使用BulkOutputFormat
的情况下,它应该像在幕后使用sstableloader
一样完成所有这些操作。我从未将它与MultipleOutputs
一起使用过,但它应该可以正常工作。
我认为这种情况下的错误是您没有正确使用MultipleOutputs
:您仍然在执行context.write
,而实际上您应该向MultipleOutputs
对象写入。按照您现在的操作方式,由于您正在向常规Context
进行写入,它将由默认的TextOutputFormat
输出格式接收,而不是您在MultipleOutputs
中定义的格式。关于如何在减速器中使用MultipleOutputs
的更多信息,请点击此处。
一旦您按照您定义的BulkOutputFormat
的正确输出格式写入,您的SSTables应该会被创建,并从集群中的每个节点流式传输到Cassandra——您不需要任何额外的步骤,输出格式会为您处理它。
此外,我建议你看一下这篇文章,他们还解释了如何使用BulkOutputFormat
,但他们使用的是ConfigHelper
,你可能想看看它,以更容易地配置你的Cassandra端点。