我想准备一个Java类,它将从MongoDB读取索引到SQLContext,以便在Spark中获得要处理的数据集。我的代码如下
SparkConf conf = new SparkConf().setAppName("Aggregation").setMaster("local");
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", uri);
conf.set("mongo.output.uri", uri);
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("host", mongoHost +":27017");
options.put("inferSchema", "true");
options.put("database", database);
Dataset df = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).option("collection", "kpi_aggregator").load();
我在maven
中使用了以下依赖项 <!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- MongoDB -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.0.0-rc0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.stratio.datasource</groupId>
<artifactId>spark-mongodb_2.11</artifactId>
<version>0.12.0</version>
</dependency>
然而,当我执行代码时,我得到以下异常
Caused by: java.lang.IllegalArgumentException: state should be: w >= 0
at com.mongodb.assertions.Assertions.isTrueArgument(Assertions.java:99)
at com.mongodb.WriteConcern.<init>(WriteConcern.java:316)
at com.mongodb.WriteConcern.<init>(WriteConcern.java:227)
at com.mongodb.casbah.WriteConcern$.<init>(WriteConcern.scala:41)
at com.mongodb.casbah.WriteConcern$.<clinit>(WriteConcern.scala)
... 43 more
你知道是什么问题吗?
我设法通过删除 com.stratio.datasource
依赖并更改我的代码如下:
SparkConf conf = new SparkConf().setAppName("Aggregation").setMaster("local");
conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
conf.set("mongo.input.uri", uri);
conf.set("mongo.output.uri", uri);
conf.set("spark.mongodb.input.partitioner","MongoPaginateBySizePartitioner");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("uri", uri);
options.put("database", database);
Dataset df = MongoSpark.read(sqlContext).options(options).option("collection", "kpi_aggregator").load();