如何计算满足最后一个条件之间的天数?



current df:

df = spark.createDataFrame([
("2020-01-12","d1",0),
("2020-01-12","d2",0),
("2020-01-13","d3",0),
("2020-01-14","d4",1), 
("2020-01-15","d5",0),
("2020-01-15","d6",0),
("2020-01-16","d7",0),
("2020-01-17","d8",0),
("2020-01-18","d9",1),
("2020-01-19","d10",0),
("2020-01-20","d11",0),], 
['date', 'device', 'condition'])
df.show()
+----------+------+---------+
|      date|device|condition|
+----------+------+---------+
|2020-01-12|    d1|        0|
|2020-01-12|    d2|        0|
|2020-01-13|    d3|        0|
|2020-01-14|    d4|        1|
|2020-01-15|    d5|        0|
|2020-01-15|    d6|        0|
|2020-01-16|    d7|        0|
|2020-01-17|    d8|        0|
|2020-01-18|    d9|        1|
|2020-01-19|   d10|        0|
|2020-01-20|   d11|        0|
+----------+------+---------+

所需输出 DF:

want_df = spark.createDataFrame([
("2020-01-12","d1",0,0),
("2020-01-12","d2",0,0),
("2020-01-13","d3",0,1),
("2020-01-14","d4",1,2), 
("2020-01-15","d5",0,1),
("2020-01-15","d6",0,1),
("2020-01-16","d7",0,2),
("2020-01-17","d8",0,3),
("2020-01-18","d9",1,4),
("2020-01-19","d10",0,1),
("2020-01-20","d11",0,2),], 
['date', 'device', 'condition', 'life'])
want_df.show()
+----------+------+---------+----+
|      date|device|condition|life|
+----------+------+---------+----+
|2020-01-12|    d1|        0|   0|
|2020-01-12|    d2|        0|   0|
|2020-01-13|    d3|        0|   1|
|2020-01-14|    d4|        1|   2|
|2020-01-15|    d5|        0|   1|
|2020-01-15|    d6|        0|   1|
|2020-01-16|    d7|        0|   2|
|2020-01-17|    d8|        0|   3|
|2020-01-18|    d9|        1|   4|
|2020-01-19|   d10|        0|   1|
|2020-01-20|   d11|        0|   2|
+----------+------+---------+----+

目标是计算到condition=1的日期差(#天(,然后从满足最后一个条件开始,日期差重置为#天。life是尝试计算的列。知道如何计算吗?Window还是lag

这是一种可以通过添加一些临时行来简化的问题(我们标记它们,然后稍后删除它们(

from pyspark.sql import Window
from pyspark.sql.functions import lit, lag, sum as fsum, first, datediff

(1( 首先,创建一个新的数据帧 df1,它复制条件 == 1 但设置其条件 = 0 和标志 = 1 的所有行,将生成的数据帧与原始数据帧(set 标志 = 0(联合:

df1 = df.withColumn('flag', lit(0)).union(
df.where('condition = 1').withColumn('condition', lit(0)).withColumn('flag', lit(1))
)

(2(然后,设置以下两个窗口规范,使用w1帮助创建一个子组标签g将所有连续的行分组,直到条件从1切换到0。 将flag添加到orderBy((中,以便新添加的行位于条件= 1的相应行的正后面,并分组到下一个组标签中。

w1 = Window.partitionBy(lit(0)).orderBy('date', 'flag')
w2 = Window.partitionBy(lit(0), 'g').orderBy('date', 'flag')

注意:如果您有一个庞大的数据帧,您可能希望将lit(0)更改为一些实际或计算列,以避免 Spark 将所有行移动到单个分区上。更新:根据注释,数据帧是单个时间序列,可以加载到单个分区上,因此使用lit(0)应该就足够了。

(3(使用w1上的滞后求和函数找到子组标签"g",然后使用WindowSpecw2计算同一组中的first_date。 此日期用于计算"寿命"列:

df2 = df1.withColumn('g', fsum((lag('condition').over(w1) == 1).astype('int')).over(w1)) 
.withColumn('first_date', first('date').over(w2)) 
.withColumn('life', datediff('date','first_date'))
df2.show()
+----------+------+---------+----+---+----------+----+
|      date|device|condition|flag|  g|first_date|life|
+----------+------+---------+----+---+----------+----+
|2020-01-12|    d1|        0|   0|  0|2020-01-12|   0|
|2020-01-12|    d2|        0|   0|  0|2020-01-12|   0|
|2020-01-13|    d3|        0|   0|  0|2020-01-12|   1|
|2020-01-14|    d4|        1|   0|  0|2020-01-12|   2|
|2020-01-14|    d4|        0|   1|  1|2020-01-14|   0|
|2020-01-15|    d5|        0|   0|  1|2020-01-14|   1|
|2020-01-15|    d6|        0|   0|  1|2020-01-14|   1|
|2020-01-16|    d7|        0|   0|  1|2020-01-14|   2|
|2020-01-17|    d8|        0|   0|  1|2020-01-14|   3|
|2020-01-18|    d9|        1|   0|  1|2020-01-14|   4|
|2020-01-18|    d9|        0|   1|  2|2020-01-18|   0|
|2020-01-19|   d10|        0|   0|  2|2020-01-18|   1|
|2020-01-20|   d11|        0|   0|  2|2020-01-18|   2|
+----------+------+---------+----+---+----------+----+

(4( 删除临时行和列以获取最终数据帧:

df_new = df2.filter('flag = 0').drop('first_date', 'g', 'flag')
df_new.show()
+----------+------+---------+----+
|      date|device|condition|life|
+----------+------+---------+----+
|2020-01-12|    d1|        0|   0|
|2020-01-12|    d2|        0|   0|
|2020-01-13|    d3|        0|   1|
|2020-01-14|    d4|        1|   2|
|2020-01-15|    d5|        0|   1|
|2020-01-15|    d6|        0|   1|
|2020-01-16|    d7|        0|   2|
|2020-01-17|    d8|        0|   3|
|2020-01-18|    d9|        1|   4|
|2020-01-19|   d10|        0|   1|
|2020-01-20|   d11|        0|   2|
+----------+------+---------+----+

我尝试从不同的方式提供,这更接近标准的sql方言,但仍然使用pyspark语法并关注性能影响。

from pyspark.sql import Window
from pyspark.sql.functions import col, when, lit, lag, min, max, datediff

选择条件等于 1 的日期范围, 然后使用联合函数与最大日期值组合。

w = Window.partitionBy('date')
dateRange = df.select(df.date).where(df.condition == 1)
.union(df.select(max(df.date))).distinct()
.orderBy('date')
.withColumn('lastDate', lag(col('date').over(w))
.select(when(col('lastDate').isNull(), lit('1970-01-01')).otherwise(col('lastDate')).alias('lastDate'), col('date').alias('toDate'))

通过将df与日期范围连接来选择日期范围和第一个最小日期, 然后进行分组并计算最小日期值。

dateRange1st = df.join(dateRange, df.date > dateRange.lastDate & df.date <= dateRange.toDate, 'inner').groupBy(dateRange.lastDate, dateRange.toDate).agg(min(df.date).alias('frDate'))

通过将日期范围(1st(加入df来选择结果, 有关帮助日期过滤并找出不同之处。

result = df.join(dateRange1st, df.date.between(dateRange1st.frDate, dateRange1st.toDate), 'inner')
.select(df.date, df.device, df.condition)
.withColumn('life', datediff(df.date - dataRange1st.frDate))
.orderBy(df.date)
result.show()

希望这有帮助!

最新更新