如何将一列 JSON 字符串转换为镶木地板表



我正在尝试将我收到的一些数据转换为最终可用于报告的镶木地板表,但感觉我错过了一个步骤。

我收到的文件是CSV,其中格式为"id","事件","源",其中"事件"列是GZIP压缩的JSON字符串。我已经能够设置一个数据帧来提取三列,包括解压缩 JSON 字符串。所以我现在有一张桌子

id | event | source | unencoded_event

其中unencoded_event是 JSON 字符串。

此时我想做的是获取 JSON 的一个字符串列并将其解析为单独的列。根据另一位开发人员的评论(转换为镶木地板的过程足够聪明,只需使用我结果的第一行来找出模式(,我尝试了这个:

df1 = spark.read.json(df.select("unencoded_event").rdd).write.format("parquet").saveAsTable("test")

但这只给了我一个单列表,其中有一列_corrupt_record,它再次包含 JSON 字符串。

我想得到的是采用模式:

{
"agent"
--"name"
--"organization"
"entity"
--"name"
----"type"
----"value"
}

并最终使表格看起来像:AgentName | Organization | EventType | EventValue

我缺少的步骤只是显式定义架构还是我过度简化了我的方法?

这里的潜在复杂性:JSON 模式实际上比上面更复杂;我一直假设我可以将整个架构扩展到更广泛的表中,然后只返回我关心的较小集合。

我还尝试从文件中获取单个结果(因此,单个 JSON 字符串(,将其另存为 JSON 文件并尝试从中读取。这样做是有效的,即执行spark.read.json(myJSON.json)将字符串解析为我期望的数组。如果我复制多个字符串,也是如此。

如果我获取原始结果并尝试保存它们,则这不起作用。如果我尝试仅将字符串列另存为 json 文件

dfWrite = df.select(col("unencoded_event"))
dfWrite.write.mode("overwrite").json(write_location)

然后把它们读回来,这行为方式不一样......每一行仍被视为字符串。

我确实找到了一个有效的解决方案。这不是一个完美的解决方案(我担心它不可扩展(,但它让我到达了我需要的地方。

我可以使用get_json_object()为我想要的每一列选择数据(抱歉,我一天中一直在摆弄列名等(:

dfResults = df.select(get_json_object("unencoded_event", "$.agent[0].name").alias("userID"), 
get_json_object("unencoded_event", "$.entity[0].identifier.value").alias("itemID"),
get_json_object("unencoded_event", "$.entity[0].detail[1].value").alias("itemInfo"),
get_json_object("unencoded_event", "$.recorded").alias("timeStamp"))

我不喜欢这一点的一件大事是,我似乎无法将过滤器/搜索选项与get_json_object()一起使用。对于可预见的未来来说,这很好,因为现在我知道所有数据应该在哪里,不需要过滤。

我相信我也可以使用from_json()但这需要在笔记本中定义架构。这不是一个好的选择,因为我只需要 JSON 的一小部分,所以定义整个架构感觉像是不必要的工作。(我也无法控制整体架构是什么,因此这成为维护问题。

最新更新