我对大数据和NOSQL领域很陌生,我正在编写一个示例程序
我正试图从我的mongo数据库中获取详细信息
{ "_id" : ObjectId("51d11c95e82449edcf7640bc"), "Called_Number" : NumberLong("7259400112"), "Calling_Number" : NumberLong("9008496311"), "Date" : "22-Apr-13", "Time" : "10:21:43", "Duration" : "4:36" }
现在我正试图从数据库中获取值,并运行一个map reduce作业,这样我就可以找到以下之类的详细信息
{"呼叫号码":7259400112,被叫号码:"9008496311"频率:"3"}
以下是我正在尝试的
package callcircle;
import java.io.*;
import java.util.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.bson.*;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.*;
public class call {
private static final Log log = LogFactory.getLog(call.class);
public static class TokenizerMapper extends
Mapper<Object, Object, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private final Text word = new Text();
public void map(Object calling_number, Object called_number,
Context context) throws IOException, InterruptedException {
System.out.println("entering method");
// calling_number = (Object) calling_number).get("Calling_Number");
called_number = ((BSONWritable) called_number).get("Called_Number");
String CallNumer01 = called_number.toString();
String[] recips = CallNumer01.split(",");
for (int i = 0; i < recips.length; i++) {
String recip = recips[i].trim();
if (recip.length() > 0) {
// context.write(new CallPair(calling_number, recip), new IntWritable(1));
// word.set(CallNumer01); context.write( word, one );
//System.out.println("After mapping");
}
}
}
}
public class CallReducer extends
Reducer<CallPair, IntWritable, BSONWritable, IntWritable> {
public void reduce(final CallPair pKey,
final Iterable<IntWritable> pValues, final Context pContext)
throws IOException, InterruptedException {
int sum = 0;
for (final IntWritable value : pValues) {
sum += value.get();
}
@SuppressWarnings("static-access")
BSONObject outDoc = new BasicDBObjectBuilder().start()
.add("f", pKey.calling_number).add("t", pKey.called_number)
.get();
BSONWritable pkeyOut = new BSONWritable(outDoc);
pContext.write(pkeyOut, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
System.out.println("In Main");
final Configuration conf = new Configuration();
System.out.println("Conf1: " + conf);
MongoConfigUtil.setInputURI(conf, "mongodb://localhost/CDR.in1");
MongoConfigUtil.setOutputURI(conf, "mongodb://localhost/CDR.out");
System.out.println("Conf: " + conf);
final Job job = new Job(conf, "CDR");
job.setJarByClass(call.class);
System.out.println("Conf2: " + conf);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(CallReducer.class);
job.setReducerClass(CallReducer.class);
System.out.println("Conf3: " + conf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.out.println("Conf3: " + conf);
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
System.out.println("Conf4: " + conf);
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("Conf6: " + conf);
}
}
但我得到以下错误
In Main
Conf1: Configuration: core-default.xml, core-site.xml
Conf: Configuration: core-default.xml, core-site.xml
13/07/01 19:04:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
Conf2: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf4: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
13/07/01 19:04:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/07/01 19:04:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/07/01 19:04:28 INFO util.MongoSplitter: Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
13/07/01 19:04:28 INFO mapred.JobClient: Running job: job_local_0001
13/07/01 19:04:28 INFO util.MongoSplitter: Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
should setup context
13/07/01 19:04:28 INFO input.MongoInputSplit: Deserialized MongoInputSplit ... { length = 9223372036854775807, locations = [localhost], keyField = _id, query = { "$query" : { }}, fields = { }, sort = { }, limit = 0, skip = 0, noTimeout = false}
13/07/01 19:04:28 INFO mapred.MapTask: io.sort.mb = 100
13/07/01 19:04:28 INFO mapred.MapTask: data buffer = 79691776/99614720
13/07/01 19:04:28 INFO mapred.MapTask: record buffer = 262144/327680
entering method
13/07/01 19:04:28 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.hadoop.io.BSONWritable
at callcircle.call$TokenizerMapper.map(call.java:36)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
13/07/01 19:04:29 INFO mapred.JobClient: map 0% reduce 0%
13/07/01 19:04:29 INFO mapred.JobClient: Job complete: job_local_0001
13/07/01 19:04:29 INFO mapred.JobClient: Counters: 0
有人能指引我哪里错了吗?
感谢
如果映射器和reducer不使用相同的输出类型,则必须显式指定映射器键/值类型,因此您可能还需要添加:
setMapOutputKeyClass(Text.class)
setMapOutputValueClass(IntWritable.class)