连接所有列并在Spark中转储为json



我有一个像下面这样的表格:

<表类> id product_id 代理 order_date tbody><<tr>1112021-09-131212021-09-131312021-09-132112021-09-132212021-09-13

使用下面的方法,要使用的重要函数是to_json和' alias'。我用scala写的,我肯定可以转换成python。

import spark.implicits._
//just to create the dataset for the example you have given
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13"))

val dataset = data.toDF("id", "product_id", "agent", "order_date")
//create the key Mapping programatically by looping if its not static
val keyMapping:Map[String,String] = Map("id" -> "internal_id","product_id" -> "item_id","agent" -> "associate_id","order_date" -> "transaction_date")
val columns = keyMapping.map(f => {
new Column(f._1).alias(f._2)
}).toSeq
dataset.withColumn("wanted_column", to_json(struct(columns:_*))).show(false)
//output 
+---+----------+-----+----------+------------------------------------------------------------------------------------+
|id |product_id|agent|order_date|json_data                                                                           |
+---+----------+-----+----------+------------------------------------------------------------------------------------+
|1  |1         |1    |2021-09-13|{"internal_id":"1","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|1  |2         |1    |2021-09-13|{"internal_id":"1","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
|1  |3         |1    |2021-09-13|{"internal_id":"1","item_id":"3","associate_id":"1","transaction_date":"2021-09-13"}|
|2  |1         |1    |2021-09-13|{"internal_id":"2","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|2  |2         |1    |2021-09-13|{"internal_id":"2","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
+---+----------+-----+----------+------------------------------------------------------------------------------------+

这个问题可能在这里得到了答案如何在pyspark中使用多行选项将数据框保存到json文件中

我用的是scala所以这就是我要做的

// mock up data, you dont need this if you already have a dataframe ready
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13")
)
import spark.implicits._
val df = data.toDF("id", "product_id", "agent", "order_date")
// mocking data is finished here, you now have a dataframe ready to work with
// changing column names to desired names
val new_df = df
.withColumnRenamed("id", "internal_id")
.withColumnRenamed("product_id", "item_id")
.withColumnRenamed("agent", "associate_id")
.withColumnRenamed("order_date", "transaction_date")
// writing json
new_df.write.json("vikas.json")

相关内容

  • 没有找到相关文章

最新更新