我想编写Java程序,它从HDFS读取输入,使用MapReduce处理它并将输出写入MongoDb。
场景如下:
- 我有一个Hadoop集群,它有3个数据节点。
- Java程序从HDFS读取输入,使用MapReduce对其进行处理。
- 最后,将结果写入 MongoDb。
实际上,从HDFS读取并使用MapReduce处理它很简单。但是我陷入了将结果写入MongoDb的困境。是否有任何Java API支持将结果写入MongoDB?另一个问题是,由于它是一个Hadoop集群,所以我们不知道哪个数据节点将运行Reducer任务并生成结果,是否可以将结果写入安装在特定服务器上的MongoDb?
如果我想将结果写入 HDFS,代码将是这样的:
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
{
long sum = 0;
for (LongWritable value : values)
{
sum += value.get();
}
context.write(new Text(key), new LongWritable(sum));
}
现在我想将结果写入MongoDb而不是HDFS,我该怎么做?
你想要 «MongoDB Connector for Hadoop»。这些例子。
在Reducer中添加代码是很诱人的,作为副作用,将数据插入到数据库中。避免这种诱惑。使用连接器而不是仅仅插入数据作为化简器类的副作用的一个原因是推测执行:Hadoop有时可以并行运行两个完全相同的reduce任务,这可能导致无关的插入和重复的数据。
是的。你像往常一样写信给蒙戈。您的 mongo db 设置为在分片上运行这一事实对您隐藏了一个细节。
我花了一上午的时间来实现相同的场景。这是我的解决方案:
创建三个类:
- 实验.java:用于作业配置和提交
- MyMap.java:映射器类
-
MyReduce.java:减速器类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.mongodb.hadoop.io.BSONWritable; import com.mongodb.hadoop.mapred.MongoOutputFormat; public class Experiment extends Configured implements Tool{ public int run(final String[] args) throws Exception { final Configuration conf = getConf(); conf.set("mongo.output.uri", args[1]); final JobConf job = new JobConf(conf); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setJarByClass(Experiment.class); job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputFormat(MongoOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BSONWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); JobClient.runJob(job); return 0; } public static void main(final String[] args) throws Exception{ int res = ToolRunner.run(new TweetPerUserToMongo(), args); System.exit(res); } }
从群集运行实验类时,将输入两个参数。第一个参数是来自HDFS位置的输入源,第二个参数是指将保留结果的mongodb URI。下面是一个示例调用。假设您的实验.java在包名称 org.example 下。
sudo -u hdfs hadoop jar ~/jar/myexample.jar org.example.Experiment myfilesinhdfs/* mongodb://192.168.0.1:27017/mydbName.myCollectionName
这可能不是最好的方法,但它为我完成了这项工作。