我试图将多个行合并为一列中的vaild json格式(spark dataframe(spark 1.6.1))。然后我希望将其存储在MySQL表中。
我的起源火花数据帧如下:
|user_id |product_id|price |
|A |p1 |3000 |
|A |p2 |1500 |
|B |P1 |3000 |
|B |P3 |2000 |
我想这样转换上表:
|user_id |contents_json
|A |{(product_id:p1, price:3000), (product_id:p2, price:1500)}
|B |{{product_id:p1, price:3000), (product_id:p3, price:2000)}
然后将表格放入mysql表中。
它完全相反的爆炸方式,但我找不到正确的方法。
我假设您正在寻找下面显示的JSON输出。
from pyspark.sql.functions import col, collect_list, struct
df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])
>spark2.0
df1 = df.
groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()
Spark1.6
zipCols = psf.udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("product_id", StringType()),
StructField("price", IntegerType())
]))
)
df1 = df.
groupBy("user_id").agg(
zipCols(
collect_list(col("product_id")),
collect_list(col("price"))
).alias("contents_json")
)
for row in df1.toJSON().collect():
print row
输出是:
{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}