如何使用spark数据框架(Scala)中的datetime列获取最旧的月份数据



我有一个spark数据帧,它有两列,一列是is id,另一列是secound id col_datetime。正如您可以看到下面给出的数据帧。如何根据coldatetime过滤数据帧以获得最旧的月份数据。我想动态地实现结果,因为我有20多个数据帧。

INPUT DF:-
import spark.implicits._
val data = Seq((1 , "2020-07-02 00:00:00.0"),(2 , "2020-08-02 00:00:00.0"),(3 , "2020-09-02 00:00:00.0"),(4 , "2020-10-02 00:00:00.0"),(5 , "2020-11-02 00:00:00.0"),(6 , "2020-12-02 00:00:00.0"),(7 , "2021-01-02 00:00:00.0"),(8 , "2021-02-02 00:00:00.0"),(9 , "2021-03-02 00:00:00.0"),(10, "2021-04-02 00:00:00.0"),(11, "2021-05-02 00:00:00.0"),(12, "2021-06-02 00:00:00.0"),(13, "2021-07-22 00:00:00.0"))
val dfFromData1 = data.toDF("ID","COL_DATETIME").withColumn("COL_DATETIME",to_timestamp(col("COL_DATETIME")))
+------+---------------------+
|ID    |COL_DATETIME         |
+------+---------------------+
|1     |2020-07-02 00:00:00.0|
|2     |2020-08-02 00:00:00.0|
|3     |2020-09-02 00:00:00.0|
|4     |2020-10-02 00:00:00.0|
|5     |2020-11-02 00:00:00.0|
|6     |2020-12-02 00:00:00.0|
|7     |2021-01-02 00:00:00.0|
|8     |2021-02-02 00:00:00.0|
|9     |2021-03-02 00:00:00.0|
|10    |2021-04-02 00:00:00.0|
|11    |2021-05-02 00:00:00.0|
|12    |2021-06-02 00:00:00.0|
|13    |2021-07-22 00:00:00.0|
+------+---------------------+
OUTPUT:-
DF1 : - Oldest month data
+------+---------------------+
|ID    |COL_DATETIME         |
+------+---------------------+
|1     |2020-07-02 00:00:00.0|
+------+---------------------+
DF2:- lastest months data after removing oldest month data from orginal DF.
+------+---------------------+
|ID    |COL_DATETIME         |
+------+---------------------+
|2     |2020-08-02 00:00:00.0|
|3     |2020-09-02 00:00:00.0|
|4     |2020-10-02 00:00:00.0|
|5     |2020-11-02 00:00:00.0|
|6     |2020-12-02 00:00:00.0|
|7     |2021-01-02 00:00:00.0|
|8     |2021-02-02 00:00:00.0|
|9     |2021-03-02 00:00:00.0|
|10    |2021-04-02 00:00:00.0|
|11    |2021-05-02 00:00:00.0|
|12    |2021-06-02 00:00:00.0|
|13    |2021-07-22 00:00:00.0|
+------+---------------------+

逻辑/方法:-

步骤1:-计算给定数据帧的col_datetime列的最小日期时间,并分配给mindate变量。假设我会得到

var mindate = "2020-07-02 00:00:00.0" 
val mindate = dfFromData1.select(min("COL_DATETIME")).first()
print(mindate)
result:- 
mindate : org.apache.spark.sql.Row = [2020-07-02 00:00:00.0]
[2020-07-02 00:00:00.0]

步骤2:-使用正念获取月底日期。我还没有为这部分编写代码来使用mindate获取enddatemonth。

Val enddatemonth = "2020-07-31 00:00:00.0" 

Step3:-现在我可以使用enddatemonth变量根据条件过滤DF1和DF2中的spark数据帧。即使我试图根据mindate过滤数据帧,我也会收到错误

val DF1 = dfFromData1.where(col("COL_DATETIME") <= enddatemonth)
val DF2 = dfFromData1.where(col("COL_DATETIME") > enddatemonth)
Error : <console>:166: error: type mismatch; 
found : org.apache.spark.sql.Row 
required: org.apache.spark.sql.Column val DF1 = dfFromData1.where(col("COL_DATETIME" )<= mindate)

谢谢。。。!!

类似的方法,但我发现它更干净,只用于处理月份。

想法:就像我们用秒计算历元,用月计算

val dfWithEpochMonth = dfFromData1.
withColumn("year",year($"COL_DATETIME")).
withColumn("month",month($"COL_DATETIME")).
withColumn("epochMonth", (($"year" - 1970 - 1) * 12) + $"month")

现在你的df看起来像:

+---+-------------------+----+-----+----------+
| ID|       COL_DATETIME|year|month|epochMonth|
+---+-------------------+----+-----+----------+
|  1|2020-07-02 00:00:00|2020|    7|       595|
|  2|2020-08-02 00:00:00|2020|    8|       596|
|  3|2020-09-02 00:00:00|2020|    9|       597|
|  4|2020-10-02 00:00:00|2020|   10|       598|

现在,您可以直接计算最小历元月份并进行筛选。

val minEpochMonth = dfWithEpochMonth.select(min("epochMonth")).first().apply(0).toString().toInt
val df1 = dfWithEpochMonth.where($"epochMonth" <= minEpochMonth)
val df2 = dfWithEpochMonth.where($"epochMonth" > minEpochMonth)

可以删除不必要的列。


要解决您的错误消息:

val mindate = dfFromData1.select(min("COL_DATETIME")).first()
val mindateString = mindate.apply(0).toString()

现在您可以使用mindateString进行过滤。

最新更新