为什么当我反序列化并尝试提取值时,这个JSON会给出null值



我使用Spark Structured Streaming来消费从一些Kafka主题发送的消息。json字符串的结构是这样的,我想提取"created_at"、"text"one_answers"tag":

{"data":
{"created_at":"***",
"id":"***",
"text":"***"},
"matching_rules":
[{"id":"***",
"tag":"***"}]
}

我写了以下模式:

val DFschema = StructType(Array(
StructField("data", StructType(Array(
StructField("created_at", TimestampType),
StructField("text", StringType)))),
StructField("matching_rules", StructType(Array(
StructField("tag", StringType)
)))
))

当我将架构与from_json((一起使用时,我可以使用getField成功地将"created_at"one_answers"text"提取为具有非null值的列,但当我尝试对"tag"执行相同操作时,其列将填充null:

val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("failOnDataLoss", "false")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(col("key"), from_json($"value", DFschema).alias("structdata"))
.select($"key",
$"structdata.data".getField("created_at").alias("created_at"),
$"structdata.data".getField("text").alias("text"),
$"structdata.matching_rules".getField("tag").alias("topic")
)
.withColumn("hour", date_format(col("created_at"), "HH"))
.withColumn("date", date_format(col("created_at"), "yyyy-MM-dd"))

在json中,我看到"id"one_answers"tag"被包裹在方括号中,这让我怀疑我在模式中遗漏了一个数据类型,但我没有足够的经验来知道是什么。感谢您的帮助。

对于数组,您必须用ArrayType包装StructType,如下所示:

val DFschema = StructType(Array(
StructField("data", StructType(Array(
StructField("created_at", TimestampType),
StructField("id", StringType),
StructField("text", StringType)))),
StructField("matching_rules", ArrayType(StructType(Array(
StructField("tag", StringType),
StructField("id", StringType))
))))
)

另一种选择是使用(假设content是您的列(:

ds = ds.withColumn("content", 
expr("from_json(content, 'STRUCT<data:STRUCT<created_at:STRING,id:STRING,text:STRING>,matching_rules:ARRAY<STRUCT<id:STRING,tag:STRING>>>')")
)

您可以要求Spark通过schema_of_json生成模式。

祝你好运!

最新更新