Mongo-hadoop mapreduce显示错误



我对大数据和NOSQL领域很陌生,我正在编写一个示例程序

我正试图从我的mongo数据库中获取详细信息

  { "_id" : ObjectId("51d11c95e82449edcf7640bc"), "Called_Number" : NumberLong("7259400112"), "Calling_Number" : NumberLong("9008496311"), "Date" : "22-Apr-13", "Time" : "10:21:43", "Duration" : "4:36" }

现在我正试图从数据库中获取值,并运行一个map reduce作业,这样我就可以找到以下之类的详细信息

{"呼叫号码":7259400112,被叫号码:"9008496311"频率:"3"}

以下是我正在尝试的

package callcircle;
import java.io.*;
import java.util.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.bson.*;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.*;
public class call {
    private static final Log log = LogFactory.getLog(call.class);
    public static class TokenizerMapper extends
            Mapper<Object, Object, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private final Text word = new Text();

        public void map(Object calling_number, Object called_number,
                Context context) throws IOException, InterruptedException {
            System.out.println("entering method");

        //  calling_number = (Object) calling_number).get("Calling_Number");
            called_number = ((BSONWritable) called_number).get("Called_Number");
            String CallNumer01 = called_number.toString();
            String[] recips = CallNumer01.split(",");

            for (int i = 0; i < recips.length; i++) {
                String recip = recips[i].trim();
                if (recip.length() > 0) {

                    // context.write(new CallPair(calling_number, recip), new IntWritable(1));
                    // word.set(CallNumer01); context.write( word, one );
                    //System.out.println("After mapping");
                }
            }
        }
    }
    public class CallReducer extends
        Reducer<CallPair, IntWritable, BSONWritable, IntWritable> {
        public void reduce(final CallPair pKey,
                final Iterable<IntWritable> pValues, final Context pContext)
                throws IOException, InterruptedException {
            int sum = 0;
            for (final IntWritable value : pValues) {
                sum += value.get();
            }
            @SuppressWarnings("static-access")
            BSONObject outDoc = new BasicDBObjectBuilder().start()
                    .add("f", pKey.calling_number).add("t", pKey.called_number)
                    .get();
            BSONWritable pkeyOut = new BSONWritable(outDoc);
            pContext.write(pkeyOut, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        final Configuration conf = new Configuration();
        System.out.println("Conf1: " + conf);
        MongoConfigUtil.setInputURI(conf, "mongodb://localhost/CDR.in1");
        MongoConfigUtil.setOutputURI(conf, "mongodb://localhost/CDR.out");
        System.out.println("Conf: " + conf);
        final Job job = new Job(conf, "CDR");
        job.setJarByClass(call.class);
        System.out.println("Conf2: " + conf);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(CallReducer.class);
        job.setReducerClass(CallReducer.class);
        System.out.println("Conf3: " + conf);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.out.println("Conf3: " + conf);
        job.setInputFormatClass(MongoInputFormat.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        System.out.println("Conf4: " + conf);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        System.out.println("Conf6: " + conf);
    }
}

但我得到以下错误

In Main
Conf1: Configuration: core-default.xml, core-site.xml
Conf: Configuration: core-default.xml, core-site.xml
13/07/01 19:04:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
Conf2: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf4: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
13/07/01 19:04:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/07/01 19:04:27 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/07/01 19:04:28 INFO util.MongoSplitter:  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
13/07/01 19:04:28 INFO mapred.JobClient: Running job: job_local_0001
13/07/01 19:04:28 INFO util.MongoSplitter:  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
should setup context
13/07/01 19:04:28 INFO input.MongoInputSplit: Deserialized MongoInputSplit ... { length = 9223372036854775807, locations = [localhost], keyField = _id, query = { "$query" : { }}, fields = { }, sort = { }, limit = 0, skip = 0, noTimeout = false}
13/07/01 19:04:28 INFO mapred.MapTask: io.sort.mb = 100
13/07/01 19:04:28 INFO mapred.MapTask: data buffer = 79691776/99614720
13/07/01 19:04:28 INFO mapred.MapTask: record buffer = 262144/327680
entering method
13/07/01 19:04:28 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.hadoop.io.BSONWritable
    at callcircle.call$TokenizerMapper.map(call.java:36)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
13/07/01 19:04:29 INFO mapred.JobClient:  map 0% reduce 0%
13/07/01 19:04:29 INFO mapred.JobClient: Job complete: job_local_0001
13/07/01 19:04:29 INFO mapred.JobClient: Counters: 0

有人能指引我哪里错了吗?

感谢

如果映射器和reducer不使用相同的输出类型,则必须显式指定映射器键/值类型,因此您可能还需要添加:

setMapOutputKeyClass(Text.class)
setMapOutputValueClass(IntWritable.class)

相关内容

  • 没有找到相关文章

最新更新