我正在处理JSON创建的数据框架,然后我想通过数据框应用过滤条件。
val jsonStr = """{ "metadata": [{ "key": 84896, "value": 54 },{ "key": 1234, "value": 12 }]}"""
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
df
的模式root
|-- metadata: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: long (nullable = true)
| | |-- value: long (nullable = true)
现在,我需要过滤我想做的数据框架
val df1=df.where("key == 84896")
引发错误
ERROR Executor - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [metadata]; line 1 pos 0;
'Filter ('key = 84896)
我要使用子句的原因是我想直接使用的表达式字符串例如( (key == 999, value == 55) || (key == 1234, value == 12) )
首先,您应该使用 explode
获取易于工作的数据框架。然后,您可以选择输入的密钥和值:
val explodedDF = df.withColumn("metadata", explode($"metadata"))
.select("metadata.key", "metadata.value")
输出:
+-----+-----+
| key|value|
+-----+-----+
|84896| 54|
| 1234| 12|
+-----+-----+
这样,您就可以照常执行过滤逻辑:
scala> explodedDF.where("key == 84896").show
+-----+-----+
| key|value|
+-----+-----+
|84896| 54|
+-----+-----+
您可以加入过滤要求,以下一些示例:
explodedDF.where("key == 84896 AND value == 54")
explodedDF.where("(key == 84896 AND value == 54) OR key = 1234")
从我的问题和评论中了解的是,您正在尝试应用( (key == 999, value == 55) || (key == 1234, value == 12) )
表达式以过滤DataFrame行。
首先,表达式需要更改,因为它不能用作 spark 中的 dataframe
的表达式,因此您需要更改为
val expression = """( (key == 999, value == 55) || (key == 1234, value == 12) )"""
val actualExpression = expression.replace(",", " and").replace("||", "or")
应该为您提供新的有效的表达
( (key == 999 and value == 55) or (key == 1234 and value == 12) )
现在您拥有有效的表达式,您的dataframe
也需要修改,因为您无法在array
和struct
上查询此类表达式
因此,您需要 explode
函数爆炸将 array
元素转换为不同的行,然后使用 .*
符号在不同列上选择 struct
的所有元素。
val df1 = df.withColumn("metadata", explode($"metadata"))
.select($"metadata.*")
应该给您dataframe
作为
+-----+-----+
|key |value|
+-----+-----+
|84896|54 |
|1234 |12 |
+-----+-----+
最终在生成的dataframe
上使用有效的表达式
df1.where(s"${actualExpression}")
我希望答案有帮助