我正在尝试使用Scala和Spark过滤比较两个日期列的DataFrame。基于过滤后的DataFrame,在顶部运行计算以计算新列。简化的我的数据框架有以下模式:
|-- received_day: date (nullable = true)
|-- finished: int (nullable = true)
在此之上,我创建了两个新列t_start
和t_end
,用于过滤DataFrame。它们与原始列received_day
:有10天和20天的差异
val dfWithDates= df
.withColumn("t_end",date_sub(col("received_day"),10))
.withColumn("t_start",date_sub(col("received_day"),20))
我现在想要一个新的计算列,它指示每一行数据在t_start
到t_end
周期中有多少行数据帧。我想我可以通过以下方式实现这一点:
val dfWithCount = dfWithDates
.withColumn("cnt", lit(
dfWithDates.filter(
$"received_day".lt(col("t_end"))
&& $"received_day".gt(col("t_start"))).count()))
然而,这个计数只返回0,我认为问题出在我传递给lt
和gt
的参数中。
根据这个问题,我意识到我需要传递一个字符串值。如果我尝试使用像lt(lit("2018-12-15"))
这样的硬编码值,那么过滤就会起作用。所以我试着把我的专栏选到StringType
:
val dfWithDates= df
.withColumn("t_end",date_sub(col("received_day"),10).cast(DataTypes.StringType))
.withColumn("t_start",date_sub(col("received_day"),20).cast(DataTypes.StringType))
但是过滤器仍然返回一个空的dataFrame。我认为我没有正确处理数据类型。
我在Scala 2.11.0和Spark 2.0.2上运行。
$"received_day".lt(col("t_end")
,将每个reveived_day
值与当前行的t_end
值进行比较,而不是将整个数据帧进行比较。所以每次你都会得到零作为计数。你可以通过编写一个简单的udf来解决这个问题。以下是解决问题的方法:
创建样本输入数据集:
import org.apache.spark.sql.{Row, SparkSession}
import java.sql.Date
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq((Date.valueOf("2018-10-12"),1),
(Date.valueOf("2018-10-13"),1),
(Date.valueOf("2018-09-25"),1),
(Date.valueOf("2018-10-14"),1)).toDF("received_day", "finished")
val dfWithDates= df
.withColumn("t_start",date_sub(col("received_day"),20))
.withColumn("t_end",date_sub(col("received_day"),10))
dfWithDates.show()
+------------+--------+----------+----------+
|received_day|finished| t_start| t_end|
+------------+--------+----------+----------+
| 2018-10-12| 1|2018-09-22|2018-10-02|
| 2018-10-13| 1|2018-09-23|2018-10-03|
| 2018-09-25| 1|2018-09-05|2018-09-15|
| 2018-10-14| 1|2018-09-24|2018-10-04|
+------------+--------+----------+----------+
对于2018-09-25
,我们需要计数3
生成输出:
val count_udf = udf((received_day:Date) => {
(dfWithDates.filter((col("t_end").gt(s"$received_day")) && col("t_start").lt(s"$received_day")).count())
})
val dfWithCount = dfWithDates.withColumn("count",count_udf(col("received_day")))
dfWithCount.show()
+------------+--------+----------+----------+-----+
|received_day|finished| t_start| t_end|count|
+------------+--------+----------+----------+-----+
| 2018-10-12| 1|2018-09-22|2018-10-02| 0|
| 2018-10-13| 1|2018-09-23|2018-10-03| 0|
| 2018-09-25| 1|2018-09-05|2018-09-15| 3|
| 2018-10-14| 1|2018-09-24|2018-10-04| 0|
+------------+--------+----------+----------+-----+
为了使计算更快,我建议缓存dfWithDates
,因为每行都有相同操作的重复。
您可以使用DateTimeFormatter 将日期值强制转换为任何模式的字符串
import java.time.format.DateTimeFormatter
date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))