Spark:使用JsonSchema将Json解析为DataFrame



我有一个文件,其中每一行都是字符串化的JSON。我想把它和模式验证一起读到Spark DataFrame中。

天真的做法是:

val schema: StructType = getSchemaFromSomewhere()
val df: DataFrame = spark.read
.option("mode", "DROPMALFORMED")
.format("json")
.schema(schema)
.load("path/to/data.json")

然而,这种方法只执行一些非常基本的模式验证。

  • 如果一行不能作为json解析,它将被删除
  • 如果一行包含的属性的值无法强制转换为schema定义的类型,则该行将被删除
  • 但是-此加载方法忽略不可为null的字段(使它们在生成的DF中为null(,并且不允许填充默认值

方法2-使用JsonSchema

为了做到这一点,我不能再使用spark.read.json()了,因为我需要JsonNode格式的数据。因此,我将其作为文本文件读取,并使用JsonSchema库进行解析:

def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode): Boolean = {
jsonSchema.exists(jsonSchema => {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
})
}
lazy val jsonSchema: Option[JsonSchema] = stringToJsonSchema(schemaSource).toOption
val schema: StructType = getSchemaFromSomewhere()
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson)
.getOrElse(false)
})
.select(from_json($"value", schema) as "jsonized")
.select("jsonized.*")

现在的问题是,我要将每个string行解析为json两次——一次在filter中,另一次在select(from_json ...)中。

我在找什么

某种方法可以将JSON数据从文件读取到DataFrame,同时对所有数据应用JsonSchema验证——无效数据应该被丢弃(可能还会记录在某个地方(。

  • 有没有一种方法可以将Dataset[JsonNode]转换为DataFrame而不需要多次解析
  • 有没有办法将DFRow转换为JsonNode对象?这样,我可以颠倒顺序——首先使用spark.read.json()读取DF,然后通过将每个Row转换为JsonNode并应用JsonSchema来过滤DF
  • 我在这里还有什么遗漏吗

感谢

有没有一种方法可以将数据集[JsonNode]转换为DataFrame而不需要多次解析?

在大多数情况下,与作业的总CPU使用量相比,解析两次的开销可能会被忽略。

如果不是这样,您可以在DataSourceV2中实现自己的TableProvider。如果解析需求可能会随着时间的推移而变化或发展,那么这可能是一个不错的长期解决方案。

相关内容

  • 没有找到相关文章

最新更新