是否有人知道Apache Spark SQL实现与标准SQL qualit_() + rnk或row_number语句相同结果的最佳方式?
例如:- 我有一个名为statement_data的Spark Dataframe,其中有12个月记录,每个记录有100个唯一的account_number,因此总共有1200条记录
- 每个月记录都有一个名为"statement_date"的字段,可用于确定最近的记录
我希望我的最终结果是一个新的Spark Dataframe,其中包含100个唯一account_number中的每一个最近的3条记录(由statement_date降序确定),因此总共有300条最终记录。
在标准Teradata SQL中,我可以做以下操作:
select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3
Apache Spark SQL没有一个独立的限定函数,我知道,也许我搞砸了语法或找不到文档,限定存在。
如果我需要分两步完成,只要这两步是:
- 为每个account_number的记录分配秩/行号的选择查询或替代方法
- 一个选择查询,我选择所有记录与排名<= 3(即选择第1,第2和第3最近的记录)。
编辑1 - 7/23 2:09pm:zero323提供的初始解决方案在安装了Spark SQL 1.4.1依赖项的Spark 1.4.1中不适合我。
编辑2 - 7/23 3:24pm:事实证明,错误与使用SQL上下文对象而不是Hive上下文查询有关。在添加以下代码来创建和使用Hive上下文后,我现在能够正确运行以下解决方案:
final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext());
....
// Initial Spark/SQL contexts to set up Dataframes
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary =
hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
没有qualify
(检查解析器源通常很有用),但您可以使用这样的子查询:
SELECT * FROM (
SELECT *, row_number() OVER (
PARTITION BY acct_id ORDER BY statement_date DESC
) rank FROM df
) tmp WHERE rank <= 3
参见SPARK: failure: ' ' union''但' ('找到