按字典的键和值过滤 Spark 数据框



我得到了一个有很多行的spark数据帧(Scala(,其中有一列是以下格式的字典(Json字符串(:

[{"ID1":111,"ID2":2,"value":"Z"},
{"ID1":222,"ID2":3,"value":"A"},
{"ID1":333,"ID2":4,"value":"Z"},
{"ID1":444,"ID2":5,"value":"B"},
{"ID1":555,"ID2":6,"value":"Z"},
{"ID1":666,"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8,"value":"A"}]

我想过滤数据帧,所以它只保留包含特定组合的行,例如ID1=111,ID2=2,value=Z。注意:并非所有行都可以具有所有键,例如,一行可能没有组合键"ID1=111〃;。

如何在Scala spark中高效地完成这项工作?谢谢

您在这里处理的是一个结构数组,它可能有点复杂。让我们将问题分为两部分:首先,我们将把该列解析为一个结构数组。您可以使用函数from_json。但为了使其发挥作用,您需要提前提供模式。因此,这里有一个工作示例(这个案例涵盖了您提到的一些字段的缺失(。


import spark.implicits._
val input = Seq[(String)](
("""[{"ID1":111,"ID2":2,"value":"Z"},
{"ID1":222,"ID2":3,"value":"A"},
{"ID1":333,"ID2":4,"value":"Z"},
{"ID1":444,"ID2":5,"value":"B"},
{"ID1":555,"ID2":6,"value":"Z"},
{"ID1":666,"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8,"value":"A"}]"""
),
("""[{"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8}]"""
),
).toDF("json")

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.types.{ArrayType, StructType, StructField, IntegerType, StringType}
val jsonSchema = new ArrayType(
new StructType()
.add("ID1", IntegerType)
.add("ID2", IntegerType)
.add("value", StringType)
, false
)
val parsed = input.withColumn("parsed_json", from_json(col("json"), jsonSchema))

现在我们有了这个,我们可以利用existing函数(这是一个很新的函数,请检查:https://mungingdata.com/spark-3/array-exists-forall-transform-aggregate-zip_with/),我们将创建一个新列,它将告诉我们是否有任何结构与我们期望的值匹配。

import org.apache.spark.sql.functions.exists
import org.apache.spark.sql.Column
def matches(id1: Integer, id2: Integer, value: String)(col: Column) = {
col.getField("ID1") === id1 && col.getField("ID2") === id2 && col.getField("value") === value
}
display(parsed.withColumn("found", exists(col("parsed_json"), matches(111,2,"Z"))))
+-------------------------------------------------------------------------------------------+-----+
|parsed_json                                                                                |found|
+-------------------------------------------------------------------------------------------+-----+
|[{111, 2, Z}, {222, 3, A}, {333, 4, Z}, {444, 5, B}, {555, 6, Z}, {666, 7, Z}, {777, 8, A}]|true |
|[{null, 7, Z}, {777, 8, null}]                                                             |false|
+-------------------------------------------------------------------------------------------+-----+

使用这个全新的列,您可以过滤数据帧并确保该模式。它可能很多,但相当健壮。

Spark SQL中有几个函数可以帮助您处理json值。可以帮助您的请求的:

  • get_json_object:根据指定的json路径从json字符串中提取json对象,并返回提取的json对象的json字符串。如果您需要访问json的特定部分,这将非常有用
  • from_json:根据json的模式,将json字符串列转换为结构列。转换后,您可以访问所需的所有字段

现在,假设您有一个数据集df,其中有一个包含json字符串的json列:

json
{ "ID1": 111, "ID2": 2, "value": "Z" }
{ "ID1": 222, "ID2": 3, "value": "A" }
{ "ID1": 333, "ID2": 4, "value": "Z" }
{ "ID1": 444, "ID2": 5, "value": "B" }
{ "ID1": 555, "ID2": 6, "value": "Z" }
{ "ID1": 666, "ID2": 7, "value": "Z" }
{ "ID1": 777, "ID2": 8, "value": "A" }

最新更新