我正在尝试将数据帧中的几个字段写入JSON。我在数据帧中的数据结构是
Key|col1|col2|col3|col4
key|a |b |c |d
Key|a1 |b1 |c1 |d1
现在,我正试图将col1到col4字段转换为JSON,并为JSON字段命名
预期输出
[Key,{cols:[{col1:a,col2:b,col3:c,col4:d},{col1:a1,col2:b1,col3:c1,col4:d1}]
我为此写了一个udf。
val summary = udf(
(col1:String, col2:String, col3:String, col4:String) => "{"cols":[" + " {"col1":" + col1 + ","col2":" + col2 + ","col3":" + col3 + ","col4":" + col4 + "}]}"
)
val result = input.withColumn("Summary",summary('col1,'col2,'col3,'col4))
val result1 = result.select('Key,'Summary)
result1.show(10)
这是我的结果
[Key,{cols:[{col1:a,col2:b,col3:c,col4:d}]}]
[Key,{cols:[{col1:a1,col2:b1,col3:c1,col4:d1}]}]
正如您所看到的,它们没有分组。有没有一种方法可以使用UDF本身对这些行进行分组。我是scala/Spark的新手,无法找到合适的udf。
// Create your dataset
scala> val ds = Seq((1, "hello", 1L), (2, "world", 2L)).toDF("id", "token", "long")
ds: org.apache.spark.sql.DataFrame = [id: int, token: string ... 1 more field]
// select the fields you want to map to json
scala> ds.select('token, 'long).write.json("your-json")
// check the result
➜ spark git:(master) ✗ ls -ltr your-json/
total 16
-rw-r--r-- 1 jacek staff 27 11 kwi 17:18 part-r-00007-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00006-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00005-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00004-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 27 11 kwi 17:18 part-r-00003-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00002-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00001-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 part-r-00000-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
-rw-r--r-- 1 jacek staff 0 11 kwi 17:18 _SUCCESS
➜ spark git:(master) ✗ cat your-json/part-r-00003-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
{"token":"hello","long":1}
➜ spark git:(master) ✗ cat your-json/part-r-00007-91f81f62-54bb-42ae-bddc-33829a0e3c16.json
{"token":"world","long":2}
UDF将把一行映射到一行。如果DataFrame
中有多行要合并为一个元素,则需要使用类似reduceByKey
的函数来聚合多行。
可能有一个特定于DataFrame
的函数来实现这一点,但我会用RDD
的函数来进行处理,比如:
val colSummary = udf(
(col1:String, col2:String, col3:String, col4:String) => "{"col1":" + col1 + ","col2":" + col2 + ","col3":" + col3 + ","col4":" + col4 + "}"
)
val colRDD = input.withColumn("Summary",summary('col1,'col2,'col3,'col4)).rdd.map(x => (x.getString(0),x.getString(5)))
这为我们提供了一个RDD[(String,String)]
,它将允许我们像使用reduceByKey
一样使用PairRDDFunctions
(请参阅文档)。元组的键是原始键,值是单个元素的json编码,我们需要将其聚合在一起以形成cols
列表。我们把它们粘在一起,形成一个逗号分隔的列表,然后加上开头和结尾,就完成了。
val result = colRDD.reduceByKey((x,y) => (x+","+y)).map(x => "["+x._1+",{"cols":["+x._2+"]}]")
result.take(10)