我需要破解一个spark查询,假设我的数据在delta表(选项卡(中,比如:-
cust date acct_id f1 f2 source_date(dd/mm/yy:h)
b1 1/10/22 acc1 x y 9/9/22:1 P.M
b1 1/10/22 acc2 x y 9/9/22:1 P.M
b1 1/10/22 acc3 x y 9/9/22:1 P.M
现在,我正在从数据库中获取更改,并将其附加到我的delta表中,例如,如果在下面的任何时刻,我使用每2分钟运行一次的调度器从数据库(使用kafka或任何其他消息代理(获得更改:-
更改:-
b1 1/10/22 acc2 p y 10/9/22:2 P.M
b1 1/10/22 acc2 p q 11/9/22:4 P.M
b1 1/10/22 acc3 m n 11/9/22:4 P.M
然后我将运行一个spark作业,将更改添加到我的delta表中,我的del塔表将看起来像:-
cust date acct_id f1 f2 source_date(dd/mm/yy:h)
b1 1/10/22 acc1 x y 9/9/22:1 P.M
b1 1/10/22 acc2 x y 9/9/22:1 P.M
b1 1/10/22 acc3 x y 9/9/22:1 P.M
b1 1/10/22 acc2 p y 10/9/22:2 P.M
b1 1/10/22 acc2 p q 11/9/22:4 P.M
b1 1/10/22 acc3 m n 11/9/22:4 P.M
现在我想使用spark提供以下查询:-
查询:获取cust=b1,日期=1/10/22,源日期=9/9/22:1 p.M 的数据
output:-
b1 1/10/22 acc1 x y 9/9/22:1 P.M
b1 1/10/22 acc2 x y 9/9/22:1 P.M
b1 1/10/22 acc3 x y 9/9/22:1 P.M
查询:获取cust=b1,日期=1/10/22,源日期=10/9/22:2 p.M 的数据
output:-
b1 1/10/22 acc1 x y 9/9/22:1 P.M
b1 1/10/22 acc2 p y 10/9/22:2 P.M
b1 1/10/22 acc3 x y 9/9/22:1 P.M
查询:获取cust=b1,日期=1/10/22,源日期=11/9/22:4 p.M 的数据
output:-
b1 1/10/22 acc1 x y 9/9/22:1 P.M
b1 1/10/22 acc2 p q 11/9/22:4 P.M
b1 1/10/22 acc3 m n 11/9/22:4 P.M
使用以下代码解决了此问题:-
Dataset result = data.filter(col("source_date").$less$eq("2022-09-10 02:00:00.0"))
.groupBy("cust", "date", "acct_id")
.agg(max("source_date").as("source_date"), last("f1").as("f1"), last("f2").as("f2"));