如何在 Scala 中迭代列中的对



我有一个这样的数据框,从镶木地板文件导入:

| Store_id | Date_d_id  | 
|    0     | 23-07-2017 |  
|    0     | 26-07-2017 |   
|    0     | 01-08-2017 |   
|    0     | 25-08-2017 |  
|    1     | 01-01-2016 |  
|    1     | 04-01-2016 |   
|    1     | 10-01-2016 |   

我接下来要实现的是成对循环浏览每个客户的日期并获得日差。下面是它应该看起来像的样子:

| Store_id | Date_d_id  | Day_diff |
|    0     | 23-07-2017 |   null   |
|    0     | 26-07-2017 |    3     |
|    0     | 01-08-2017 |    6     |   
|    0     | 25-08-2017 |    24    |  
|    1     | 01-01-2016 |    null  |  
|    1     | 04-01-2016 |    3     |   
|    1     | 10-01-2016 |    6     | 

最后,我想将数据框减少到客户的平均日差:

| Store_id | avg_diff |
|    0     | 7.75     |  
|    1     |  3       |

我对 Scala 很陌生,我什至不知道从哪里开始。任何帮助都非常感谢!提前谢谢。

另外,我正在使用齐柏林飞艇笔记本

一种方法是使用 lag(Date) over Window partition 和 UDF 来计算连续行之间的天数差异,然后对数据帧进行分组以获得以天为单位的平均差异。 请注意,Date_d_id 将转换为 yyyy-mm-dd 格式,以便在 Window 分区中进行正确的字符串排序:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = Seq(
  (0, "23-07-2017"),
  (0, "26-07-2017"),
  (0, "01-08-2017"),
  (0, "25-08-2017"),
  (1, "01-01-2016"),
  (1, "04-01-2016"),
  (1, "10-01-2016")
).toDF("Store_id", "Date_d_id")
def daysDiff = udf(
  (d1: String, d2: String) => {
    import java.time.LocalDate 
    import java.time.temporal.ChronoUnit.DAYS
    DAYS.between(LocalDate.parse(d1), LocalDate.parse(d2))
  }
)
val df2 = df.
  withColumn( "Date_ymd",
    regexp_replace($"Date_d_id", """(d+)-(d+)-(d+)""", "$3-$2-$1")).
  withColumn( "Prior_date_ymd",
    lag("Date_ymd", 1).over(Window.partitionBy("Store_id").orderBy("Date_ymd"))).
  withColumn( "Days_diff",
    when($"Prior_date_ymd".isNotNull, daysDiff($"Prior_date_ymd", $"Date_ymd")).
    otherwise(0L))
df2.show
// +--------+----------+----------+--------------+---------+
// |Store_id| Date_d_id|  Date_ymd|Prior_date_ymd|Days_diff|
// +--------+----------+----------+--------------+---------+
// |       1|01-01-2016|2016-01-01|          null|        0|
// |       1|04-01-2016|2016-01-04|    2016-01-01|        3|
// |       1|10-01-2016|2016-01-10|    2016-01-04|        6|
// |       0|23-07-2017|2017-07-23|          null|        0|
// |       0|26-07-2017|2017-07-26|    2017-07-23|        3|
// |       0|01-08-2017|2017-08-01|    2017-07-26|        6|
// |       0|25-08-2017|2017-08-25|    2017-08-01|       24|
// +--------+----------+----------+--------------+---------+
val resultDF = df2.groupBy("Store_id").agg(avg("Days_diff").as("Avg_diff"))
resultDF.show
// +--------+--------+
// |Store_id|Avg_diff|
// +--------+--------+
// |       1|     3.0|
// |       0|    8.25|
// +--------+--------+
您可以使用

lag函数获取Window函数的上一个日期,然后执行一些操作以获取所需的最终dataframe

首先,Date_d_id列需要转换为包含时间戳才能使排序正常工作

import org.apache.spark.sql.functions._
val timestapeddf = df.withColumn("Date_d_id", from_unixtime(unix_timestamp($"Date_d_id", "dd-MM-yyyy")))

这应该给你的dataframe

+--------+-------------------+
|Store_id|          Date_d_id|
+--------+-------------------+
|       0|2017-07-23 00:00:00|
|       0|2017-07-26 00:00:00|
|       0|2017-08-01 00:00:00|
|       0|2017-08-25 00:00:00|
|       1|2016-01-01 00:00:00|
|       1|2016-01-04 00:00:00|
|       1|2016-01-10 00:00:00|
+--------+-------------------+

然后,您可以将lag函数应用于window函数,最终获得日期差异

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Store_id").orderBy("Date_d_id")
val laggeddf = timestapeddf.withColumn("Day_diff", when(lag("Date_d_id", 1).over(windowSpec).isNull, null).otherwise(datediff($"Date_d_id", lag("Date_d_id", 1).over(windowSpec))))

laggeddf应该是

+--------+-------------------+--------+
|Store_id|Date_d_id          |Day_diff|
+--------+-------------------+--------+
|0       |2017-07-23 00:00:00|null    |
|0       |2017-07-26 00:00:00|3       |
|0       |2017-08-01 00:00:00|6       |
|0       |2017-08-25 00:00:00|24      |
|1       |2016-01-01 00:00:00|null    |
|1       |2016-01-04 00:00:00|3       |
|1       |2016-01-10 00:00:00|6       |
+--------+-------------------+--------+

现在最后一步是使用 groupByaggregation 来求平均值

laggeddf.groupBy("Store_id")
  .agg(avg("Day_diff").as("avg_diff"))

应该给你

+--------+--------+
|Store_id|avg_diff|
+--------+--------+
|       0|    11.0|
|       1|     4.5|
+--------+--------+

现在,如果您想忽略空Day_diff那么您可以这样做

laggeddf.groupBy("Store_id")
          .agg((sum("Day_diff")/count($"Day_diff".isNotNull)).as("avg_diff"))

应该给你

+--------+--------+
|Store_id|avg_diff|
+--------+--------+
|       0|    8.25|
|       1|     3.0|
+--------+--------+

我希望答案对您有所帮助

相关内容

  • 没有找到相关文章

最新更新