循环访问类型为 Array[DateType] 的 Spark 列,并查看数组中是否有连续两天



我有一个数据帧,它由 2 列组成。第一个是ID,第二个是与该ID相关联的几个日期的数组。

我需要遍历包含日期的列,并查看该数组中是否有连续 2 天。

+--------------------+--------------------+
|                  id|                date|
+--------------------+--------------------+
|1003                |[2018-02-12, 2018...|
|1003                |[2018-02-04, 2018...|
|1003                |[2018-02-05, 2018...|
|1003                |[2018-02-02, 2018...|
|1003                |[2018-02-28, 2018...|
|1003                |[2018-02-07, 2018...|
|1003                |[2018-02-13, 2018...|
|1003                |[2018-02-21, 2018...|
|1003                |[2018-02-15, 2018...|
|1003                |[2018-02-15, 2018...|
+--------------------+--------------------+

如果你使用的是Spark>=2.4,你可以在数组上使用高阶函数来做到这一点。下面是一个示例:

// example data
val df = Seq(
(1003, Array("2019-01-13", "2019-01-14", "2019-05-O3")), 
(1004, Array("2019-02-23", "2019-01-18", "2019-12-O6")), 
(1005, Array("2019-03-10", "2019-06-23", "2019-06-24")), 
(1006, Array("2019-04-11", "2019-04-18", "2019-14-19"))
).toDF("id","date")
// first we sort the dates in the array column
val sortedDatesDf = df.select(col("id"), array_sort(col("date")).alias("dates"))
// we apply transform, and exists function on the sorted array of dates
sortedDatesDf.withColumn("consecutive_dates", exists(transform(col("dates"), 
(x, i) => lit(date_add(x, lit(1)) === col("dates")(i+1))), x => x)
).show()

输出:

+----+------------------------------------+-----------------+
|id  |dates                               |consecutive_dates|
+----+------------------------------------+-----------------+
|1003|[2019-01-13, 2019-01-14, 2019-05-O3]|true             |
|1004|[2019-01-18, 2019-02-23, 2019-12-O6]|null             |
|1005|[2019-03-10, 2019-06-23, 2019-06-24]|true             |
|1006|[2019-04-11, 2019-04-18, 2019-14-19]|null             |
+----+------------------------------------+-----------------+

它是如何工作的?

  • 首先,我们对日期数组进行排序,以便我们可以比较 2 个连续的值。array_sort函数将按升序对输入数组进行排序。
  • transform函数采用排序数组和一个 lambda 函数 :(x, i) => Boolean。其中x实际值并将其i数组中的索引。因此,要知道两个日期是否连续,我们将一天添加到x并检查它是否等于数组中的下一个日期(索引i+1(。
  • 最后,我们使用exists函数和 lambda 函数x => Boolean检查转换后的数组中是否至少有一个值true(这意味着至少连续 2 个日期(,因为这些值是布尔类型,我们只需要x => x

没有高阶函数怎么做?

如果不能使用上述解决方案,则可以使用其他解决方案作为:

  • 创建UDF函数,该函数获取日期列表,如果包含两个连续的日期,则返回布尔值,然后将其与DF :df.withColumn("consecutive_dates", containsConsecurtiveDates(col("date")))一起使用。
  • 另一种解决方案是分解date列,然后使用一些 SQL 窗口函数(lagrow_number,..(来检测具有连续日期的 ID,最后按 ID 进行分组结果。

最新更新