我正在编写map reduce程序来查询cassandra列族。我只需要从一个列族中读取行的子集(使用行键)。我有一套我必须阅读的行的行键。我如何将"行密钥集"传递给map reduce作业,以便它只能输出来自cassandra columnbfamily的行的子集?
摘要:
enter code here
class GetRows()
{
public set<String> getRowKeys()
{
logic.....
return set<string>;
}
}
class MapReduceCassandra()
{
inputformat---columnFamilyInputFormat
.
;
also need input key-set .. How to get it?
}
有人能建议从java应用程序调用mapreduce的最佳方法吗?以及如何将一组键传递给mapreduce?
从Java调用map reduce
要做到这一点,您可以使用来自org.apache.hadoop.mapreduce
名称空间的类(您可以使用非常类似的方法使用较旧的mapred
,只需检查java应用程序中的API文档):
Job job = Job.getInstance(new Configuration());
// configure job: set input and output types and directories, etc.
job.setJarByClass(MapReduceCassandra.class);
job.submit();
将数据传递到mapreduce作业
如果您的行键集非常小,您可以将其序列化为字符串,并将其作为配置参数传递:
job.getConfiguration().set("CassandraRows", getRowsKeysSerialized()); // TODO: implement serializer
//...
job.submit();
在作业中,您将能够通过上下文对象访问参数:
public void map(
IntWritable key, // your key type
Text value, // your value type
Context context
)
{
// ...
String rowsSerialized = context.getConfiguration().get("CassandraRows");
String[] rows = deserializeRows(rowsSerialized); // TODO: implement deserializer
//...
}
但是,如果您的集合可能是无界的,那么将其作为参数传递是个坏主意。相反,您应该在文件中传递密钥,并利用分布式缓存。然后你可以在提交作业之前将这一行添加到上面的部分:
job.addCacheFile(new Path(pathToCassandraKeySetFile).toUri());
//...
job.submit();
在作业内部,您可以通过上下文对象访问此文件:
public void map(
IntWritable key, // your key type
Text value, // your value type
Context context
)
{
// ...
URI[] cacheFiles = context.getCacheFiles();
// find, open and read your file here
// ...
}
注意:所有这些都是为新API(org.apache.hadoop.mapreduce
)准备的。如果您使用的是org.apache.hadoop.mapred
,则方法非常相似,但会在不同的对象上调用一些相关的方法。