使用不同的数据类型将SQL CSV转换为JSON



目前,我有一个csv数据,如下所示:

id,key,value
id_1,int_key,1
id_1,string_key,asd
id_1,double_key,null
id_2,double_key,2.0

我想将这些按id分组的属性及其相应的正确数据类型转换为json。

我希望有这样的json结构:

[{
id: "id_1"
attributes: {
int_key: 1,
string_key: "asd"
double_key: null
}
},
id: "id_2"
attributes: {
double_key: 2.0
}]

我目前的解决方案是在Spark中用to_json收集_list,它看起来像这样:

SELECT to_json(id, map_from_arrays(collect_list(key), collect_list(value)) as attributes GROUP BY id)

然而,这将起作用,我找不到转换为正确数据类型的方法。

[{
id: "id_1"
attributes: {
int_key: "1",
string_key: "asd"
double_key: "null"
}
},
id: "id_2"
attributes: {
double_key: "2.0"
}]

我还需要添加对null值的支持。但我已经找到了解决方案。我在to_json中使用ignoreNulls选项。因此,如果我试图枚举每个属性并将它们强制转换为相应的类型,我将包括所有定义的属性。我只想包括csv文件中定义的用户属性。

顺便说一下,我使用的是Spark 2.4。

Python:这是我从scala版本转换的PySpark版本。结果是一样的。

from pyspark.sql.functions import col, max, struct
df = spark.read.option("header","true").csv("test.csv")
keys = [row.key for row in df.select(col("key")).distinct().collect()]
df2 = df.groupBy("id").pivot("key").agg(max("value"))
df2.show()
df2.printSchema()
for key in keys:
df2 = df2.withColumn(key, col(key).cast(key.split('_')[0]))
df2.show()
df2.printSchema()
df3 = df2.select("id", struct("int_key", "double_key", "string_key").alias("attributes"))
jsonArray = df3.toJSON().collect()
for json in jsonArray: print(json)

Scala:我试图首先使用pivot来拆分每种类型的值。

val keys = df.select('key).distinct.rdd.map(r => r(0).toString).collect
val df2 = df.groupBy('id).pivot('key, keys).agg(max('value))
df2.show
df2.printSchema

然后,DataFrame如下所示:

+----+-------+----------+----------+
|  id|int_key|double_key|string_key|
+----+-------+----------+----------+
|id_2|   null|       2.0|      null|
|id_1|      1|      null|       asd|
+----+-------+----------+----------+
root
|-- id: string (nullable = true)
|-- int_key: string (nullable = true)
|-- double_key: string (nullable = true)
|-- string_key: string (nullable = true)

其中每列的类型仍然是字符串。为了铸造它,我使用了foldLeft

val df3 = keys.foldLeft(df2) { (df, key) => df.withColumn(key, col(key).cast(key.split("_").head)) }
df3.show
df3.printSchema

结果现在有collrect类型。

+----+-------+----------+----------+
|  id|int_key|double_key|string_key|
+----+-------+----------+----------+
|id_2|   null|       2.0|      null|
|id_1|      1|      null|       asd|
+----+-------+----------+----------+
root
|-- id: string (nullable = true)
|-- int_key: integer (nullable = true)
|-- double_key: double (nullable = true)
|-- string_key: string (nullable = true)

然后,您可以构建json,例如

val df4 = df3.select('id, struct('int_key, 'double_key, 'string_key) as "attributes")
val jsonArray = df4.toJSON.collect
jsonArray.foreach(println)

其中最后一行用于检查结果

{"id":"id_2","attributes":{"double_key":2.0}}
{"id":"id_1","attributes":{"int_key":1,"string_key":"asd"}}

最新更新