Spark Scala根据自联接数据帧查询的结果输入空值



我很难编写我的spark-scala代码,以使用带条件的自联接来填充覆盖率为空的行。

这是数据:

+----+--------------+----------+--------+
| ID | date_in_days | coverage | values |
+----+--------------+----------+--------+
|  1 | 2020-09-01   |          | 0.128  |
|  1 | 2020-09-03   |        0 | 0.358  |
|  1 | 2020-09-04   |        0 | 0.035  |
|  1 | 2020-09-05   |          |        |
|  1 | 2020-09-06   |          |        |
|  1 | 2020-09-19   |          |        |
|  1 | 2020-09-12   |          |        |
|  1 | 2020-09-18   |          |        |
|  1 | 2020-09-11   |          |        |
|  1 | 2020-09-16   |          |        |
|  1 | 2020-09-21   |       13 | 0.554  |
|  1 | 2020-09-23   |          |        |
|  1 | 2020-09-30   |          |        |
+----+--------------+----------+--------+

预期结果:

+----+--------------+----------+--------+
| ID | date_in_day  | coverage | values |
+----+--------------+----------+--------+
|  1 | 2020-09-01   |       -1 | 0.128  |
|  1 | 2020-09-03   |        0 | 0.358  |
|  1 | 2020-09-04   |        0 | 0.035  |
|  1 | 2020-09-05   |        0 |        |
|  1 | 2020-09-06   |        0 |        |
|  1 | 2020-09-19   |        0 |        |
|  1 | 2020-09-12   |        0 |        |
|  1 | 2020-09-18   |        0 |        |
|  1 | 2020-09-11   |        0 |        |
|  1 | 2020-09-16   |        0 |        |
|  1 | 2020-09-21   |       13 | 0.554  |
|  1 | 2020-09-23   |       -1 |        |
|  1 | 2020-09-30   |       -1 |        |

我想做的事:

对于按日期排序的每个不同ID(按ID划分的数据帧(

用例:行覆盖率列为空,我们称之为rowEmptycoverage:

  1. 在DF中查找具有date_in_days > rowEmptycoverage.date_in_dayscoverage >= 0的第一行。我们称之为rowFirstDateGreater
  2. 然后,如果rowFirstDateGreater.values > 500rowEmptycoverage.coverage设置为0。否则设置为-1

当加入其中时,我有点迷失在混合中。。。

我假设你的意思是值>0.500而不是值>500.此外,逻辑仍然不清楚。在这里,我假设您是按列date_in_days的顺序搜索,而不是按数据帧的顺序搜索。

在任何情况下,我们都可以完善解决方案,以满足您的确切需求。总体思路是使用Window获取覆盖范围不为空的下一个日期,检查values是否满足所需标准,并更新coverage

如下所示:

val win = Window.partitionBy("ID").orderBy("date_in_days")
.rangeBetween(Window.currentRow, Window.unboundedFollowing)
df
// creating a struct binding coverage and values
.withColumn("cov_str", when('coverage isNull, lit(null))
.otherwise(struct('coverage, 'values)))
// finding the first row (starting from the current date, in order of 
// date_in_days) for which the coverage is not null
.withColumn("next_cov_str", first('cov_str, ignoreNulls=true) over win)
// updating coverage. We keep the original value if not null, put 0 if values
// meets the criteria (that you can change) and -1 otherwise.
.withColumn("coverage", coalesce(
'coverage,
when($"next_cov_str.values" > 0.500, lit(0)),
lit(-1)
))
.show(false)
+---+-------------------+--------+------+-----------+------------+
|ID |date_in_days       |coverage|values|cov_str    |next_cov_str|
+---+-------------------+--------+------+-----------+------------+
|1  |2020-09-01 00:00:00|-1      |0.128 |null       |[0, 0.358]  |
|1  |2020-09-03 00:00:00|0       |0.358 |[0, 0.358] |[0, 0.358]  |
|1  |2020-09-04 00:00:00|0       |0.035 |[0, 0.035] |[0, 0.035]  |
|1  |2020-09-05 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-06 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-11 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-12 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-16 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-18 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-19 00:00:00|0       |null  |null       |[13, 0.554] |
|1  |2020-09-21 00:00:00|13      |0.554 |[13, 0.554]|[13, 0.554] |
|1  |2020-09-23 00:00:00|-1      |null  |null       |null        |
|1  |2020-09-30 00:00:00|-1      |null  |null       |null        |
+---+-------------------+--------+------+-----------+------------+

然后您可以使用drop("cov_str", "next_cov_str"),但为了清楚起见,我将它们留在此处。

最新更新