由以下原因引起:org.bson.BsonInvalidOperationException:无效状态 INITIAL



互联网上有几个类似的问题,但没有人有答案。

我使用以下代码将 mongo 数据保存到 Hive,但出现异常,如最后所示。我会问如何解决这个问题

我正在使用

  • Spark-mongo-connector (Spark 2.1.0 - Scala 2.11(

  • java-mongo-driver 3.10.2

    import com.mongodb.spark.MongoSpark
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.StructType
    object MongoConnector_Test {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().set("spark.mongodb.input.uri", "mongodb://user:pass@mongo1:123456/db1.t1").setMaster("local[4]").setAppName("MongoConnectorTest")
        val session = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        val schema: StructType = new StructType().add("_id", "string").add("x", "string").add("y", "string").add("z", "string")//
        val df = MongoSpark.read(session).schema(schema).load()
        df.write.saveAsTable("MongoConnector_Test" + System.currentTimeMillis())
      }
    }
    

但是,会发生以下异常。

Caused by: org.bson.BsonInvalidOperationException: Invalid state INITIAL
    at org.bson.json.StrictCharacterStreamJsonWriter.checkState(StrictCharacterStreamJsonWriter.java:395)
    at org.bson.json.StrictCharacterStreamJsonWriter.writeNull(StrictCharacterStreamJsonWriter.java:192)
    at org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:24)
    at org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:21)
    at org.bson.json.JsonWriter.doWriteNull(JsonWriter.java:206)
    at org.bson.AbstractBsonWriter.writeNull(AbstractBsonWriter.java:557)
    at org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:38)
    at org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:28)
    at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
    at org.bson.codecs.BsonValueCodec.encode(BsonValueCodec.java:62)
    at com.mongodb.spark.sql.BsonValueToJson$.apply(BsonValueToJson.scala:29)
    at com.mongodb.spark.sql.MapFunctions$.bsonValueToString(MapFunctions.scala:103)
    at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:78)
    at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
    at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:37)
    at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
    at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
    ... 8 more

Mongo 将数据存储在文档中,并且并非所有文档的架构都是固定的。因此,请注意,由于元数据可能为空,这种情况可能是您的问题的根本原因。让我们忽略某些字段在某些文档中不可用,问题将得到解决。

实际上,架构未命中匹配应该是实际问题。

Mongo DB 具有灵活的模式类型,因此即使您没有字段和值为 null,它也可以工作。但不应出现架构不匹配。

例如,在多个集合中,相同的结构/列不应具有子类型的遗漏和匹配。

我们遇到了这个问题,经过多次检查,我们终于可以通过更正集合的模式来解决它,从字符串到布尔值

如果您仍然遇到此问题。假设您在 mongo 集合中有 2 个文档。这将给出错误,因为structType otherDetails字段(isExternalUser,isPrivate(中的子元素具有布尔值和字符串。因此,两者都应更改为字符串或布尔值以使其工作。同时,它在所有集合文档中可能有也可能没有某个字段(这里是内部在第二位不存在(。

{
"_id" : ObjectId("5aa78d90d169ed325063b06d"),
"Name" : Kailash Test,
"EmpId" : 1234567,
"company" : "test.com",
"otherDetails" : {
    "isPrivate" : false,
    "isInternal" : false,
    "isExternalUser" : true
},
}
{
"_id" : ObjectId("5aa78d90d169ed123456789d"),
"Name" : Kailash Test2,
"EmpId" : 1234567,
"company" : "test.com",
"otherDetails" : {
    "isPrivate" : "false",
    "isExternalUser" : "true"
},
}

相关内容

最新更新