我有一个文件,其中每一行都是字符串化的JSON。我想把它和模式验证一起读到Spark DataFrame中。
天真的做法是:
val schema: StructType = getSchemaFromSomewhere()
val df: DataFrame = spark.read
.option("mode", "DROPMALFORMED")
.format("json")
.schema(schema)
.load("path/to/data.json")
然而,这种方法只执行一些非常基本的模式验证。
- 如果一行不能作为json解析,它将被删除
- 如果一行包含的属性的值无法强制转换为
schema
定义的类型,则该行将被删除 - 但是-此加载方法忽略不可为null的字段(使它们在生成的DF中为null(,并且不允许填充默认值
方法2-使用JsonSchema
为了做到这一点,我不能再使用spark.read.json()
了,因为我需要JsonNode
格式的数据。因此,我将其作为文本文件读取,并使用JsonSchema库进行解析:
def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode): Boolean = {
jsonSchema.exists(jsonSchema => {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
})
}
lazy val jsonSchema: Option[JsonSchema] = stringToJsonSchema(schemaSource).toOption
val schema: StructType = getSchemaFromSomewhere()
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson)
.getOrElse(false)
})
.select(from_json($"value", schema) as "jsonized")
.select("jsonized.*")
现在的问题是,我要将每个string
行解析为json两次——一次在filter
中,另一次在select(from_json ...)
中。
我在找什么
某种方法可以将JSON数据从文件读取到DataFrame,同时对所有数据应用JsonSchema验证——无效数据应该被丢弃(可能还会记录在某个地方(。
- 有没有一种方法可以将
Dataset[JsonNode]
转换为DataFrame
而不需要多次解析 - 有没有办法将DF
Row
转换为JsonNode
对象?这样,我可以颠倒顺序——首先使用spark.read.json()
读取DF,然后通过将每个Row
转换为JsonNode
并应用JsonSchema
来过滤DF - 我在这里还有什么遗漏吗
感谢
有没有一种方法可以将数据集[JsonNode]转换为DataFrame而不需要多次解析?
在大多数情况下,与作业的总CPU使用量相比,解析两次的开销可能会被忽略。
如果不是这样,您可以在DataSourceV2
中实现自己的TableProvider
。如果解析需求可能会随着时间的推移而变化或发展,那么这可能是一个不错的长期解决方案。