我有一个像下面这样的表格:
<表类>
id
product_id
代理
order_date
tbody><<tr>1 1 1 2021-09-13 12 1 2021-09-13 13 1 2021-09-13 21 1 2021-09-13 22 1 2021-09-13 表类>
使用下面的方法,要使用的重要函数是to_json
和' alias'。我用scala写的,我肯定可以转换成python。
import spark.implicits._
//just to create the dataset for the example you have given
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13"))
val dataset = data.toDF("id", "product_id", "agent", "order_date")
//create the key Mapping programatically by looping if its not static
val keyMapping:Map[String,String] = Map("id" -> "internal_id","product_id" -> "item_id","agent" -> "associate_id","order_date" -> "transaction_date")
val columns = keyMapping.map(f => {
new Column(f._1).alias(f._2)
}).toSeq
dataset.withColumn("wanted_column", to_json(struct(columns:_*))).show(false)
//output
+---+----------+-----+----------+------------------------------------------------------------------------------------+
|id |product_id|agent|order_date|json_data |
+---+----------+-----+----------+------------------------------------------------------------------------------------+
|1 |1 |1 |2021-09-13|{"internal_id":"1","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|1 |2 |1 |2021-09-13|{"internal_id":"1","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
|1 |3 |1 |2021-09-13|{"internal_id":"1","item_id":"3","associate_id":"1","transaction_date":"2021-09-13"}|
|2 |1 |1 |2021-09-13|{"internal_id":"2","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|2 |2 |1 |2021-09-13|{"internal_id":"2","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
+---+----------+-----+----------+------------------------------------------------------------------------------------+
这个问题可能在这里得到了答案如何在pyspark中使用多行选项将数据框保存到json文件中
我用的是scala所以这就是我要做的
// mock up data, you dont need this if you already have a dataframe ready
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13")
)
import spark.implicits._
val df = data.toDF("id", "product_id", "agent", "order_date")
// mocking data is finished here, you now have a dataframe ready to work with
// changing column names to desired names
val new_df = df
.withColumnRenamed("id", "internal_id")
.withColumnRenamed("product_id", "item_id")
.withColumnRenamed("agent", "associate_id")
.withColumnRenamed("order_date", "transaction_date")
// writing json
new_df.write.json("vikas.json")