dataframe Spark scala explode json array



假设我有一个数据帧,如下所示:

+--------------------+--------------------+--------------------------------------------------------------+
|                id  |           Name     |                                                       Payment|
+--------------------+--------------------+--------------------------------------------------------------+
|                1   |           James    |[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|
+--------------------+--------------------+--------------------------------------------------------------+

架构是:

|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

如何将上述 JSON 数组分解为以下内容:

+--------------------+--------------------+-------------------------------+
|                id  |           Name     |                        Payment|
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   {"@id":1, "currency":"GBP"} |
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   {"@id":2, "currency":"USD"} |
+--------------------+--------------------+-------------------------------+

我一直在尝试使用如下所示的爆炸功能,但它不起作用。它给出了一个错误,即无法分解字符串类型,并且它需要映射或数组。这是有道理的,因为架构表示它是一个字符串,而不是一个数组/映射,但我不确定如何将其转换为适当的格式。

val newDF = dataframe.withColumn("nestedPayment", explode(dataframe.col("Payment")))

任何帮助将不胜感激!

您必须

将 JSON 字符串解析为 JSON 数组,然后在结果上使用 explode(explode 需要一个数组(。

为此(假设 Spark 2.0.*(:

  • 如果您知道所有Payment值都包含一个表示相同大小的数组(例如本例中的 2(的 json,则可以对第一个和第二个元素的提取进行硬编码,将它们包装在一个数组中并分解:

    val newDF = dataframe.withColumn("Payment", explode(array(
      get_json_object($"Payment", "$[0]"),
      get_json_object($"Payment", "$[1]")
    )))
    
  • 如果不能保证所有记录都具有具有 2 元素数组的 JSON,但可以保证这些数组的最大长度,则可以使用此技巧将元素解析为最大大小,然后过滤掉生成的null

    val maxJsonParts = 3 // whatever that number is...
    val jsonElements = (0 until maxJsonParts)
                         .map(i => get_json_object($"Payment", s"$$[$i]"))
    val newDF = dataframe
      .withColumn("Payment", explode(array(jsonElements: _*)))
      .where(!isnull($"Payment")) 
    
import org.apache.spark.sql.types._
val newDF = dataframe.withColumn("Payment", 
explode(
from_json(
  get_json_object($"Payment", "$."),ArrayType(StringType)
)))

我的解决方案是将您的 json 数组字符串包装成 json 字符串,以将from_json函数与字符串数组的结构类型一起使用

val dataframe = spark.sparkContext.parallelize(Seq(("1", "James", "[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]"))).toDF("id", "Name", "Payment")
val result = dataframe.withColumn("wrapped_json", concat_ws("", lit("{"array":"), col("Payment"), lit("}")))
    .withColumn("array_json", from_json(col("wrapped_json"), StructType(Seq(StructField("array", ArrayType(StringType))))))
    .withColumn("result", explode(col("array_json.array")))

结果:

+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+
|id |Name |Payment                                                       |wrapped_json                                                            |array_json                                                |result                    |
+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+
|1  |James|[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|{"array":[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]}|[[{"@id":1,"currency":"GBP"}, {"@id":2,"currency":"USD"}]]|{"@id":1,"currency":"GBP"}|
|1  |James|[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|{"array":[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]}|[[{"@id":1,"currency":"GBP"}, {"@id":2,"currency":"USD"}]]|{"@id":2,"currency":"USD"}|
+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+

我正在使用 spark 2.3.2,Kudakwashe Nyatsanza 的解决方案对我不起作用,它抛出了org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(value)' due to data type mismatch: Input schema array<string> must be a struct or an array of structs.

您可以使用 ArrayType 定义付款 json 数组的架构。

import org.apache.spark.sql.types._
val paymentSchema = ArrayType(StructType(
                  Array(
                        StructField("@id", DataTypes.IntegerType),
                        StructField("currency", DataTypes.StringType)
                  )
))

然后,在将from_json与此架构一起使用后爆炸将返回所需的结果。

val newDF = dataframe.withColumn("Payment", explode(from_json($"Payment", paymentSchema)))

最新更新