我正在尝试将MongoDb中的数据提取到我的DF中。使用Java Spark
以下是示例代码:-
SparkConf conf = new SparkConf()
.setAppName("MongoSparkConnectorTour")
.setMaster("local")
.set("spark.app.id", "MongoSparkConnectorTour")
.set("spark.mongodb.input.uri", uri)
.set("sampleSize", args[2])
.set("spark.mongodb.output.uri", uri)
.set("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner")
.set("spark.mongodb.input.partitionerOptions.numberOfPartitions", "64")
JavaSparkContext jsc = new JavaSparkContext(conf)
DataFrame df = MongoSpark.load(jsc).toDF();
System.out.println("DF Count - " + df.count());
df.printSchema();
有2个表,1个表我可以在没有任何问题的情况下获取数据,但对于另一个表,我遇到了以下问题-
20/07/15 14:17:31 ERROR Executor: Exception in task 1.0 in stage 3.0 (TID 4)
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a NullType (value: BsonString{value='4492148'})
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:80)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:38)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:36)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:36)
at com.mongodb.spark.sql.MapFunctions$.castToStructType(MapFunctions.scala:109)
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:75)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:38)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:36)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:36)
at com.mongodb.spark.sql.MapFunctions$.castToStructType(MapFunctions.scala:109)
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:75)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:38)
从谷歌,我看到的唯一解决方案是增加样本量,但它仍然不起作用。
堆栈溢出导致强制转换失败增加样本量
第二个表的容量稍高,我尝试了更大的样本量,但仍然失败。
解决这一问题的任何其他建议或想法都将是有用的。
我正在使用胶水,在从documentDB(Mongo API(加载数据时遇到了同样的问题。我在阅读时做了以下更改以解决此错误。
df=spark.read.format("mongo").option("ssl.domain_match","false").option("replaceDocument" , "false").option("database","mydatabase").option("collection", "mycollection").option("uri","myurl").load(schema=new_schema, inferSchema=False)
请注意:我已经尝试过使用"传递inferSchema和schema参数;。option(("也一样,但没有奏效。当我在";加载((";正如示例代码中给出的那样,它起作用了。