在JSON创建的数据框架上应用过滤条件



我正在处理JSON创建的数据框架,然后我想通过数据框应用过滤条件。

val jsonStr = """{ "metadata": [{ "key": 84896, "value": 54 },{ "key": 1234, "value": 12 }]}"""
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)

df

的模式
root
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: long (nullable = true)
 |    |    |-- value: long (nullable = true)

现在,我需要过滤我想做的数据框架

val df1=df.where("key == 84896")

引发错误

ERROR Executor - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [metadata]; line 1 pos 0;
'Filter ('key = 84896)

我要使用子句的原因是我想直接使用的表达式字符串例如( (key == 999, value == 55) || (key == 1234, value == 12) )

首先,您应该使用 explode获取易于工作的数据框架。然后,您可以选择输入的密钥和值:

val explodedDF = df.withColumn("metadata", explode($"metadata"))
  .select("metadata.key", "metadata.value")

输出:

+-----+-----+
|  key|value|
+-----+-----+
|84896|   54|
| 1234|   12|
+-----+-----+

这样,您就可以照常执行过滤逻辑:

scala> explodedDF.where("key == 84896").show
+-----+-----+
|  key|value|
+-----+-----+
|84896|   54|
+-----+-----+

您可以加入过滤要求,以下一些示例:

explodedDF.where("key == 84896 AND value == 54")
explodedDF.where("(key == 84896 AND value == 54) OR key = 1234")

从我的问题和评论中了解的是,您正在尝试应用( (key == 999, value == 55) || (key == 1234, value == 12) )表达式以过滤DataFrame行。

首先,表达式需要更改,因为它不能用作 spark 中的 dataframe的表达式,因此您需要更改为

val expression = """( (key == 999, value == 55) || (key == 1234, value == 12) )"""
val actualExpression = expression.replace(",", " and").replace("||", "or")

应该为您提供新的有效的表达

( (key == 999 and value == 55) or (key == 1234 and value == 12) )

现在您拥有有效的表达式,您的dataframe也需要修改,因为您无法在arraystruct上查询此类表达式

因此,您需要 explode函数爆炸array元素转换为不同的行,然后使用 .*符号在不同列上选择 struct的所有元素。

val df1 = df.withColumn("metadata", explode($"metadata"))
  .select($"metadata.*")

应该给您dataframe作为

+-----+-----+
|key  |value|
+-----+-----+
|84896|54   |
|1234 |12   |
+-----+-----+

最终在生成的dataframe上使用有效的表达式

df1.where(s"${actualExpression}")

我希望答案有帮助

最新更新