这个用例可以通过滞后/Spark的任何其他功能来完成吗



我使用的是spark-2.4.1v。我的项目中有一个用例,对于每个日期(process_date(,我需要将当天的记录与前一天的记录相结合,并对该数据集执行某些其他操作。那么,如何准备数据集呢?我尝试了使用滞后函数,但没有取得多大成功。

对于上述用例,给定的数据如下:

+----------+----------+----+-------+------------+-----------+
|company_id|  gen_date|year|quarter|total_assets|create_date|
+----------+----------+----+-------+------------+-----------+
| 989856662|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856665|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856667|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856662|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+----------+----+-------+------------+-----------+

这里gen_date是关键列。对于每个gen_date,我需要获取其以前可用的gen_date记录。这些将作为集合一起处理,即对于process_ date 2019-01-02——它应该具有2019-01-02&2019-01-01类似于gen_ date的process_ date记录2018-12-30&其上一个gen_date,即2018-12-29,但在这里2018-12-29的gen_date记录不可用,因此应视为gen_date 2018-12-30记录。

在给定的集合中:

对于process_date 2019-01-02=>(gen_date 2019-01-02(的记录+(gen_date2019-01-01(的记录
对于process_date 2019-01-01=>(gen_date 2019-01-01(的记录+(gen_date2018-12-31(的记录
对于process_date 2018-12-31=>(gen_date 2018-12-31(的记录+(gen_date2018-12-30(的记录
对于process_date 2018.12-30=>(2018年12月30日(的记录+没有以前的日期记录。

输出应如下所示:

+----------+------------+----------+----+-------+------------+-----------+
|company_id|process_date|  gen_date|year|quarter|total_assets|create_date|
+----------+------------+----------+----+-------+------------+-----------+
| 989856662|  2019-01-02|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856662|  2019-01-02|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|  2019-01-01|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856662|  2019-01-01|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|  2019-01-01|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856665|  2019-01-01|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|  2019-01-01|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856667|  2019-01-01|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
| 989856662|  2018-12-30|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-30|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-30|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+------------+----------+----+-------+------------+-----------+

如何实现以上输出?

下面是相同的附加笔记本url。

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html

为了获得给定gen_datecompany_id的前一天详细信息,您可以使用以下spec的滞后函数,

val windowSpec  = Window.partitionBy("company_id").orderBy("gen_date") 
val intermediateDF = finDF
.withColumn("previous_gen_date", lag("gen_date",1).over(windowSpec))

上面的步骤将根据company_id和gen_date获得上一个生成日期,您可以将这些数据与原始数据连接起来,以获得相关的前一天数据。

val finalDF = intermediateDF.alias("a")
.join(finDF.alias("b"), col("a.company_id") === col("b.company_id") &&
col("a.previous_gen_date") === col("b.gen_date"), "left_outer")
.select(col("a.*"),
col("b.year").as("previous_gen_date_year"),
col("b.quarter").as("previous_gen_date_quarter"),
col("b.total_assets").as("previous_gen_date_total_assets"),
col("b.create_date").as("previous_gen_date_create_date")
)

上述联接将生成前一天的完整数据以及生成日期。

+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|company_id|gen_date  |year|quarter|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|989856662 |2018-12-30|2018|4      |3832.435058 |2019-09-11 |null             |null                  |null                     |null                          |null                         |
|989856662 |2018-12-31|2018|4      |3700.435058 |2019-09-11 |2018-12-30       |2018                  |4                        |3832.435058                   |2019-09-11                   |
|989856662 |2019-01-01|2019|1      |3800.435058 |2019-09-11 |2018-12-31       |2018                  |4                        |3700.435058                   |2019-09-11                   |
|989856662 |2019-01-02|2019|1      |3900.435058 |2019-09-11 |2019-01-01       |2019                  |1                        |3800.435058                   |2019-09-11                   |
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+

在这里,您的gen_date也可以充当process_date列,使用它可以比较任何操作的两天数据。

最新更新