我很难编写我的spark-scala代码,以使用带条件的自联接来填充覆盖率为空的行。
这是数据:
+----+--------------+----------+--------+
| ID | date_in_days | coverage | values |
+----+--------------+----------+--------+
| 1 | 2020-09-01 | | 0.128 |
| 1 | 2020-09-03 | 0 | 0.358 |
| 1 | 2020-09-04 | 0 | 0.035 |
| 1 | 2020-09-05 | | |
| 1 | 2020-09-06 | | |
| 1 | 2020-09-19 | | |
| 1 | 2020-09-12 | | |
| 1 | 2020-09-18 | | |
| 1 | 2020-09-11 | | |
| 1 | 2020-09-16 | | |
| 1 | 2020-09-21 | 13 | 0.554 |
| 1 | 2020-09-23 | | |
| 1 | 2020-09-30 | | |
+----+--------------+----------+--------+
预期结果:
+----+--------------+----------+--------+
| ID | date_in_day | coverage | values |
+----+--------------+----------+--------+
| 1 | 2020-09-01 | -1 | 0.128 |
| 1 | 2020-09-03 | 0 | 0.358 |
| 1 | 2020-09-04 | 0 | 0.035 |
| 1 | 2020-09-05 | 0 | |
| 1 | 2020-09-06 | 0 | |
| 1 | 2020-09-19 | 0 | |
| 1 | 2020-09-12 | 0 | |
| 1 | 2020-09-18 | 0 | |
| 1 | 2020-09-11 | 0 | |
| 1 | 2020-09-16 | 0 | |
| 1 | 2020-09-21 | 13 | 0.554 |
| 1 | 2020-09-23 | -1 | |
| 1 | 2020-09-30 | -1 | |
我想做的事:
对于按日期排序的每个不同ID(按ID划分的数据帧(
用例:行覆盖率列为空,我们称之为rowEmptycoverage
:
- 在DF中查找具有
date_in_days > rowEmptycoverage.date_in_days
和coverage >= 0
的第一行。我们称之为rowFirstDateGreater
- 然后,如果
rowFirstDateGreater.values > 500
将rowEmptycoverage.coverage
设置为0。否则设置为-1
当加入其中时,我有点迷失在混合中。。。
我假设你的意思是值>0.500而不是值>500.此外,逻辑仍然不清楚。在这里,我假设您是按列date_in_days
的顺序搜索,而不是按数据帧的顺序搜索。
在任何情况下,我们都可以完善解决方案,以满足您的确切需求。总体思路是使用Window获取覆盖范围不为空的下一个日期,检查values
是否满足所需标准,并更新coverage
。
如下所示:
val win = Window.partitionBy("ID").orderBy("date_in_days")
.rangeBetween(Window.currentRow, Window.unboundedFollowing)
df
// creating a struct binding coverage and values
.withColumn("cov_str", when('coverage isNull, lit(null))
.otherwise(struct('coverage, 'values)))
// finding the first row (starting from the current date, in order of
// date_in_days) for which the coverage is not null
.withColumn("next_cov_str", first('cov_str, ignoreNulls=true) over win)
// updating coverage. We keep the original value if not null, put 0 if values
// meets the criteria (that you can change) and -1 otherwise.
.withColumn("coverage", coalesce(
'coverage,
when($"next_cov_str.values" > 0.500, lit(0)),
lit(-1)
))
.show(false)
+---+-------------------+--------+------+-----------+------------+
|ID |date_in_days |coverage|values|cov_str |next_cov_str|
+---+-------------------+--------+------+-----------+------------+
|1 |2020-09-01 00:00:00|-1 |0.128 |null |[0, 0.358] |
|1 |2020-09-03 00:00:00|0 |0.358 |[0, 0.358] |[0, 0.358] |
|1 |2020-09-04 00:00:00|0 |0.035 |[0, 0.035] |[0, 0.035] |
|1 |2020-09-05 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-06 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-11 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-12 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-16 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-18 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-19 00:00:00|0 |null |null |[13, 0.554] |
|1 |2020-09-21 00:00:00|13 |0.554 |[13, 0.554]|[13, 0.554] |
|1 |2020-09-23 00:00:00|-1 |null |null |null |
|1 |2020-09-30 00:00:00|-1 |null |null |null |
+---+-------------------+--------+------+-----------+------------+
然后您可以使用drop("cov_str", "next_cov_str")
,但为了清楚起见,我将它们留在此处。