在spark批处理作业中,我通常会将JSON数据源写入文件,并可以使用DataFrame读取器的损坏列功能将损坏的数据写入单独的位置,另一个读取器将同一作业中的有效数据写入。(数据写为镶木地板)
但在Spark Structed Streaming中,我首先通过kafka将流作为字符串读取,然后使用from_json获取我的DataFrame。然后from_json使用JsonToStructs,它在解析器中使用FailFast模式,并且不会将未解析的字符串返回到DataFrame中的列。(请参阅参考文献中的注释)那么,如何使用SSS将与我的模式不匹配的损坏数据以及可能无效的JSON写入另一个位置?
最后,在批处理作业中,同一作业可以写入两个数据帧。但是Spark结构化流需要对多个接收器进行特殊处理。然后在Spark 2.3.1(我的当前版本)中,我们应该包括如何正确写入损坏和无效流的详细信息。。。
参考编号:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html
val rawKafkaDataFrame=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.broker)
.option("kafka.ssl.truststore.location", path.toString)
.option("kafka.ssl.truststore.password", config.pass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.security.protocol", "SSL")
.option("subscribe", config.topic)
.option("startingOffsets", "earliest")
.load()
val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")
当您从字符串转换为json时,如果它无法使用提供的模式进行解析,它将返回null。您可以过滤空值并选择字符串。像这样的东西。
val jsonDF = jsonDataFrame.withColumn("json", from_json(col("value"), schema))
val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
我只是想弄清楚结构化流媒体的_corrup_record等价物。以下是我的想法;希望它能让你更接近你想要的东西:
// add a status column to partition our output by
// optional: only keep the unparsed json if it was corrupt
// writes up to 2 subdirs: 'out.par/status=OK' and 'out.par/status=CORRUPT'
// additional status codes for validation of nested fields could be added in similar fashion
df.withColumn("struct", from_json($"value", schema))
.withColumn("status", when($"struct".isNull, lit("CORRUPT")).otherwise(lit("OK")))
.withColumn("value", when($"status" <=> lit("CORRUPT"), $"value"))
.write
.partitionBy("status")
.parquet("out.par")