用Java在Spark中填充来自MongoDB的数据



我想准备一个Java类,它将从MongoDB读取索引到SQLContext,以便在Spark中获得要处理的数据集。我的代码如下

SparkConf conf = new SparkConf().setAppName("Aggregation").setMaster("local");
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", uri);
conf.set("mongo.output.uri", uri);
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("host", mongoHost +":27017");
options.put("inferSchema", "true");
options.put("database", database);
Dataset df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).option("collection", "kpi_aggregator").load();

我在maven

中使用了以下依赖项
        <!-- Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
        <!-- MongoDB -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>2.0.0-rc0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.stratio.datasource</groupId>
            <artifactId>spark-mongodb_2.11</artifactId>
            <version>0.12.0</version>
        </dependency>

然而,当我执行代码时,我得到以下异常

Caused by: java.lang.IllegalArgumentException: state should be: w >= 0
    at com.mongodb.assertions.Assertions.isTrueArgument(Assertions.java:99)
    at com.mongodb.WriteConcern.<init>(WriteConcern.java:316)
    at com.mongodb.WriteConcern.<init>(WriteConcern.java:227)
    at com.mongodb.casbah.WriteConcern$.<init>(WriteConcern.scala:41)
    at com.mongodb.casbah.WriteConcern$.<clinit>(WriteConcern.scala)
    ... 43 more

你知道是什么问题吗?

我设法通过删除 com.stratio.datasource 依赖并更改我的代码如下:

SparkConf conf = new SparkConf().setAppName("Aggregation").setMaster("local");
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", uri);
conf.set("mongo.output.uri", uri);    
conf.set("spark.mongodb.input.partitioner","MongoPaginateBySizePartitioner");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("uri", uri);
options.put("database", database);
Dataset df = MongoSpark.read(sqlContext).options(options).option("collection", "kpi_aggregator").load();

相关内容

  • 没有找到相关文章

最新更新