我有两个数据帧df1和df2。两者都有一个"日期"列,如下所示。
df1的结构
+----------+
| date|
+----------+
|02-01-2015|
|02-02-2015|
|02-03-2015|
+----------+
df2的结构
+---+-------+-----+----------+
| ID|feature|value| date|
+---+-------+-----+----------+
| 1|balance| 100|01-01-2015|
| 1|balance| 100|05-01-2015|
| 1|balance| 100|30-01-2015|
| 1|balance| 100|01-02-2015|
| 1|balance| 100|01-03-2015|
+---+-------+-----+----------+
我必须从 df1 中获取"日期"列中的每一行,与 df2 "date"进行比较,并从 df2 中获取小于 df1 中日期的所有行。
假设从 df1 获取第一行 02-01-2015,并从 df2 获取小于 02-01-2015 的所有行,这应该产生如下输出
+---+-------+-----+----------+
| ID|feature|value| date|
+---+-------+-----+----------+
| 1|balance| 100|01-01-2015|
+---+-------+-----+----------+
在火花斯卡拉中实现这一目标的最佳方法是什么?我有数亿行。我想在 Spark 中使用窗口函数,但窗口仅限于一个数据帧。
这会得到一个新的数据帧中的所有结果:
val df1 = Seq(
"02-01-2015",
"02-02-2015",
"02-03-2015"
).toDF("date")
.withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))
val df2 = Seq(
(1, "balance", 100, "01-01-2015"),
(1, "balance", 100, "05-01-2015"),
(1, "balance", 100, "30-01-2015"),
(1, "balance", 100, "01-02-2015"),
(1, "balance", 100, "01-03-2015")
).toDF("ID", "feature", "value", "date")
.withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))
df1.join(
df2, df2("date") < df1("date"), "left"
).show()
+-------------------+---+-------+-----+-------------------+
| date| ID|feature|value| date|
+-------------------+---+-------+-----+-------------------+
|2015-01-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-01-05 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-01-30 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-02-01 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-01-05 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-01-30 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-02-01 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-03-01 00:00:00|
+-------------------+---+-------+-----+-------------------+
编辑: 要从 DF2 获取匹配记录的数量,请执行以下操作:
df1.join(
df2, df2("date") < df1("date"), "left"
)
.groupBy(df1("date"))
.count
.orderBy(df1("date"))
.show
+-------------------+-----+
| date|count|
+-------------------+-----+
|2015-01-02 00:00:00| 1|
|2015-02-02 00:00:00| 4|
|2015-03-02 00:00:00| 5|
+-------------------+-----+
如果您只想将一行df1
与df2
date
进行比较,那么您应该首先从df1
中select
预期的行
val oneRowDF1 = df1.select($"date".as("date2")).where($"date" === "02-01-2015")
那么你应该join
你的逻辑
df2.join(oneRowDF1, unix_timestamp(df2("date"), "dd-MM-yyyy") < unix_timestamp(oneRowDF1("date2"), "dd-MM-yyyy"))
.drop("date2")
应该给你
+---+-------+-----+----------+
|ID |feature|value|date |
+---+-------+-----+----------+
|1 |balance|100 |01-01-2015|
+---+-------+-----+----------+
更新
联接成本很高,因为它需要在不同节点的执行器之间洗牌数据。
您可以简单地使用过滤功能,如下所示
val oneRowDF1 = df1.select(unix_timestamp($"date", "dd-MM-yyyy").as("date2")).where($"date" === "02-01-2015")
df2.filter(unix_timestamp($"date", "dd-MM-yyyy") < oneRowDF1.take(1)(0)(0))
我希望答案对您有所帮助