Spark:如何将行合并到JSON阵列



输入:

id1   id2    name   value           epid
"xxx" "yyy"  "EAN"  "5057723043"    "1299"
"xxx" "yyy"  "MPN"  "EVBD"          "1299"

我想要:

{         "id1": "xxx",
          "id2": "yyy",
          "item_specifics": [
            {
              "name": "EAN",
              "value": "5057723043"
            },
            {
              "name": "MPN",
              "value": "EVBD"
            },
            {
              "name": "EPID",
              "value": "1299"
            }
          ]
}

我尝试了以下两个解决方案,从如何将列汇总到JSON数组中?以及如何将行与vaild json作为vaild json的列将其合并到MySQL中:

pi_df.groupBy(col("id1"), col("id2"))
  //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
  .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))

但是我得到了:

{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }

如何解决这个问题?谢谢

for spark<2.4

您可以创建2个数据范围,一个具有名称和价值,另一个具有Epic为名称和史诗般的值,并将它们共同结合。然后将它们汇总为collect_set并创建一个JSON。代码应该看起来像这样。

//Creating Test Data
val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
  .toDF("id1", "id2", "name", "value", "epid")
df.show(false)
+---+---+----+----------+----+
|id1|id2|name|value     |epid|
+---+---+----+----------+----+
|xxx|yyy|EAN |5057723043|1299|
|xxx|yyy|MPN |EVBD      |1299|
+---+---+----+----------+----+
val df1 = df.withColumn("map", struct(col("name"), col("value")))
  .select("id1", "id2", "map")
val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
  .select("id1", "id2", "map")
val jsonDF = df1.union(df2).groupBy("id1", "id2")
  .agg(collect_set("map").as("item_specifics"))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics")))
jsonDF.select("json").show(false)
+---------------------------------------------------------------------------------------------------------------------------------------------+
|json                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------+

spark = 2.4

它提供了一个array_union方法。没有联盟,这可能会有所帮助。我没有尝试过。

val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
  .withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
  .groupBy("id1", "id2")
    .agg(collect_set("map1").as("item_specifics1"),
      collect_set("map2").as("item_specifics2"))
  .withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))

您非常接近。我相信您正在寻找这样的东西:

val pi_df2 = pi_df.withColumn("name", lit("EPID")).
withColumnRenamed("epid", "value").
select("id1", "id2", "name","value")
pi_df.select("id1", "id2", "name","value").
union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))).
groupBy(col("id1"), col("id2")).
agg(collect_list(col("item_specific")).alias("item_specifics")).
write.json(...)

联盟应将Epid带回item_specifics

这是您需要做的

    import scala.util.parsing.json.JSONObject
    import scala.collection.mutable.WrappedArray
    //Define udf
    val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=> {
 //Add epid to item_specifics json
val item_withEPID = item_specifics :+ Map("epid" -> epid)
val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\",""))).flatten.mkString("[",",","]")
 //Add id1 and id2 to output json
val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq )
JSONObject(m).toString().replace("\","")
})
val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid")
//Add epid as part of group by column else the column will not be available after group by and aggregation
val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid"))
df.show(false)
scala> df.show(false)
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id1|id2|epid|item_specifics                                                                                                                                                      |
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|xxx|yyy|1299|{"id1" : "xxx", "id2" : "yyy", "item_specifics" : [{"name" : "MPN", "value" : "EVBD"},{"name" : "EAN", "value" : "5057723043"},{"name" : "epid", "value" : "1299"}]}|
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+

item_specifics列的内容/输出

{
    "id1": "xxx",
    "id2": "yyy",
    "item_specifics": [{
        "name": "MPN",
        "value": "EVBD"
    }, {
        "name": "EAN",
        "value": "5057723043"
    }, {
        "name": "epid",
        "value": "1299"
    }]
}

相关内容

  • 没有找到相关文章

最新更新