如何在spark中的mongo db中添加uri和数据库



在以下代码中,我尝试使用options映射在readconfig中传递mongo uri和数据库。但是它给出的错误是找不到URI或数据库。

`

public JavaMongoRDD<Document> getRDDFromDS(DataSourceInfo ds, String collectionName){
        String mongoDBURI = "mongodb://"
                + PropertiesFileEncryptorUtil.decryptData(ds.getDbUsername()) + ":"
                + PropertiesFileEncryptorUtil.decryptData(ds.getDbPassword()) + "@"
                + ds.getHostName() + ":" + ds.getPort();
        Map<String, String> readOverrides = new HashMap<String, String>();
        readOverrides.put("uri", mongoDBURI);
        readOverrides.put("database", ds.getDbName());
        readOverrides.put("collection", collectionName);
        readOverrides.put("partitioner", mongoDBInputPartitioner);
        readOverrides.put("partitionKey", mongoDBPartitionKey);
        readOverrides.put("partitionSizeMB", mongoDBInputPartitionSize);
        ReadConfig readConf = ReadConfig.create(jsc).withOptions(readOverrides);
        JavaMongoRDD<Document> readRdd = MongoSpark.load(jsc, readConf);
        return readRdd;
    }`

通过URI和数据库的正确方法是什么。预先感谢。

您可以将配置参数传递到通过配置变量引发:

 val conf = new SparkConf().setAppName("YourAppName").setMaster("local[2]").set("spark.executor.memory","1g")
      .set("spark.app.id","YourSparkId")
      .set("spark.mongodb.input.uri","mongodb://127.0.0.1/yourdatabase.yourInputcollection?readPreference=primaryPreferred")
      .set("spark.mongodb.output.uri","mongodb://127.0.0.1/yourdatabase.yourOutputcollection")

之后,您需要将配置变量提供给火花上下文:

val sc = new SparkContext(conf)
val readConf = ReadConfig( sc )

然后,您可以从Mongo读取值:

 val rdd = sc.loadFromMongoDB( readConfig = readConfig )

并这样保存:

rdd.map( someMapFunction ).saveToMongoDB()

我希望我的答案有帮助

最新更新