如何将行与Spark DataFrame的列合并为Vaild JSON,将其写入MySQL



我试图将多个行合并为一列中的vaild json格式(spark dataframe(spark 1.6.1))。然后我希望将其存储在MySQL表中。

我的起源火花数据帧如下:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |

我想这样转换上表:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 

然后将表格放入mysql表中。

它完全相反的爆炸方式,但我找不到正确的方法。

我假设您正在寻找下面显示的JSON输出。

from pyspark.sql.functions import col, collect_list, struct
df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
                     ('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])

>spark2.0

df1 = df.
    groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()

Spark1.6

zipCols = psf.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("product_id", StringType()),
      StructField("price", IntegerType())
  ]))
)
df1 = df.
    groupBy("user_id").agg(
        zipCols(
            collect_list(col("product_id")), 
            collect_list(col("price"))
        ).alias("contents_json")
    )

for row in df1.toJSON().collect():
    print row

输出是:

{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}

相关内容

  • 没有找到相关文章

最新更新