我有一个spark数据帧,它有两列,一列是is id,另一列是secound id col_datetime。正如您可以看到下面给出的数据帧。如何根据coldatetime过滤数据帧以获得最旧的月份数据。我想动态地实现结果,因为我有20多个数据帧。
INPUT DF:-
import spark.implicits._
val data = Seq((1 , "2020-07-02 00:00:00.0"),(2 , "2020-08-02 00:00:00.0"),(3 , "2020-09-02 00:00:00.0"),(4 , "2020-10-02 00:00:00.0"),(5 , "2020-11-02 00:00:00.0"),(6 , "2020-12-02 00:00:00.0"),(7 , "2021-01-02 00:00:00.0"),(8 , "2021-02-02 00:00:00.0"),(9 , "2021-03-02 00:00:00.0"),(10, "2021-04-02 00:00:00.0"),(11, "2021-05-02 00:00:00.0"),(12, "2021-06-02 00:00:00.0"),(13, "2021-07-22 00:00:00.0"))
val dfFromData1 = data.toDF("ID","COL_DATETIME").withColumn("COL_DATETIME",to_timestamp(col("COL_DATETIME")))
+------+---------------------+
|ID |COL_DATETIME |
+------+---------------------+
|1 |2020-07-02 00:00:00.0|
|2 |2020-08-02 00:00:00.0|
|3 |2020-09-02 00:00:00.0|
|4 |2020-10-02 00:00:00.0|
|5 |2020-11-02 00:00:00.0|
|6 |2020-12-02 00:00:00.0|
|7 |2021-01-02 00:00:00.0|
|8 |2021-02-02 00:00:00.0|
|9 |2021-03-02 00:00:00.0|
|10 |2021-04-02 00:00:00.0|
|11 |2021-05-02 00:00:00.0|
|12 |2021-06-02 00:00:00.0|
|13 |2021-07-22 00:00:00.0|
+------+---------------------+
OUTPUT:-
DF1 : - Oldest month data
+------+---------------------+
|ID |COL_DATETIME |
+------+---------------------+
|1 |2020-07-02 00:00:00.0|
+------+---------------------+
DF2:- lastest months data after removing oldest month data from orginal DF.
+------+---------------------+
|ID |COL_DATETIME |
+------+---------------------+
|2 |2020-08-02 00:00:00.0|
|3 |2020-09-02 00:00:00.0|
|4 |2020-10-02 00:00:00.0|
|5 |2020-11-02 00:00:00.0|
|6 |2020-12-02 00:00:00.0|
|7 |2021-01-02 00:00:00.0|
|8 |2021-02-02 00:00:00.0|
|9 |2021-03-02 00:00:00.0|
|10 |2021-04-02 00:00:00.0|
|11 |2021-05-02 00:00:00.0|
|12 |2021-06-02 00:00:00.0|
|13 |2021-07-22 00:00:00.0|
+------+---------------------+
逻辑/方法:-
步骤1:-计算给定数据帧的col_datetime列的最小日期时间,并分配给mindate变量。假设我会得到
var mindate = "2020-07-02 00:00:00.0"
val mindate = dfFromData1.select(min("COL_DATETIME")).first()
print(mindate)
result:-
mindate : org.apache.spark.sql.Row = [2020-07-02 00:00:00.0]
[2020-07-02 00:00:00.0]
步骤2:-使用正念获取月底日期。我还没有为这部分编写代码来使用mindate获取enddatemonth。
Val enddatemonth = "2020-07-31 00:00:00.0"
Step3:-现在我可以使用enddatemonth变量根据条件过滤DF1和DF2中的spark数据帧。即使我试图根据mindate过滤数据帧,我也会收到错误
val DF1 = dfFromData1.where(col("COL_DATETIME") <= enddatemonth)
val DF2 = dfFromData1.where(col("COL_DATETIME") > enddatemonth)
Error : <console>:166: error: type mismatch;
found : org.apache.spark.sql.Row
required: org.apache.spark.sql.Column val DF1 = dfFromData1.where(col("COL_DATETIME" )<= mindate)
谢谢。。。!!
类似的方法,但我发现它更干净,只用于处理月份。
想法:就像我们用秒计算历元,用月计算
val dfWithEpochMonth = dfFromData1.
withColumn("year",year($"COL_DATETIME")).
withColumn("month",month($"COL_DATETIME")).
withColumn("epochMonth", (($"year" - 1970 - 1) * 12) + $"month")
现在你的df看起来像:
+---+-------------------+----+-----+----------+
| ID| COL_DATETIME|year|month|epochMonth|
+---+-------------------+----+-----+----------+
| 1|2020-07-02 00:00:00|2020| 7| 595|
| 2|2020-08-02 00:00:00|2020| 8| 596|
| 3|2020-09-02 00:00:00|2020| 9| 597|
| 4|2020-10-02 00:00:00|2020| 10| 598|
现在,您可以直接计算最小历元月份并进行筛选。
val minEpochMonth = dfWithEpochMonth.select(min("epochMonth")).first().apply(0).toString().toInt
val df1 = dfWithEpochMonth.where($"epochMonth" <= minEpochMonth)
val df2 = dfWithEpochMonth.where($"epochMonth" > minEpochMonth)
可以删除不必要的列。
要解决您的错误消息:
val mindate = dfFromData1.select(min("COL_DATETIME")).first()
val mindateString = mindate.apply(0).toString()
现在您可以使用mindateString进行过滤。