减去周末时间后求时差


TIME_SPENT_WORKING_DAYS
ID START_TIME_ END_TIME_ TIME_DIFF
1 2022-04-18 09:58:38.906 2022:04-18 12:50:27204 10308298
2022-04-18 21:26:25.671 2022-04-18 21:26:36.313 10642
3 2022-04-18 21:27:12.388 2022-04-18 21:27:27.227 14839
4 2022-04-19 09:57:40.453 2022:04-19 09:59:07.612 87159

输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, '2022-04-18 09:58:38.906',  '2022-04-18 12:50:27.204'),
(2, '2022-04-18 21:26:25.671',  '2022-04-18 21:26:36.313'),
(3, '2022-04-18 21:27:12.388',  '2022-04-18 21:27:27.227'),
(4, '2022-04-19 09:57:40.453',  '2022-04-29 09:59:07.612')],
['ID', 'START_TIME_', 'END_TIME_'])
使用高阶函数aggregatefilter的方法:
df = df.withColumn(
'TIME_SPENT_WORKING_DAYS',
F.aggregate(
F.expr("filter(sequence(to_date(START_TIME_), to_date(END_TIME_)), x -> weekday(x) < 5)"),
F.lit(0).cast('long'),
lambda acc, d: acc + F.least(F.date_add(d, 1), F.to_timestamp('END_TIME_')).cast('long')
- F.greatest(d, F.to_timestamp('START_TIME_')).cast('long')
)    
)
df.show(truncate=0)
# +---+-----------------------+-----------------------+-----------------------+
# |ID |START_TIME_            |END_TIME_              |TIME_SPENT_WORKING_DAYS|
# +---+-----------------------+-----------------------+-----------------------+
# |1  |2022-04-18 09:58:38.906|2022-04-18 12:50:27.204|10309                  |
# |2  |2022-04-18 21:26:25.671|2022-04-18 21:26:36.313|11                     |
# |3  |2022-04-18 21:27:12.388|2022-04-18 21:27:27.227|15                     |
# |4  |2022-04-19 09:57:40.453|2022-04-29 09:59:07.612|691287                 |
# +---+-----------------------+-----------------------+-----------------------+

Spark 2.4+的语法

df = df.withColumn(
'TIME_SPENT_WORKING_DAYS',
F.expr("""
aggregate(
filter(sequence(to_date(START_TIME_), to_date(END_TIME_)), x -> weekday(x) < 5),
cast(0 as long),
(acc, d) -> acc + cast(least(date_add(d, 1), to_timestamp(END_TIME_)) as long)
- cast(greatest(d, to_timestamp(START_TIME_)) as long)
)
""")
)
  • 使用explodegroupBy的方法:

    df = df.withColumn('day', F.explode(F.sequence(F.to_date('START_TIME_'), F.to_date('END_TIME_'))))
    workday = (~F.dayofweek('day').isin([1, 7])).cast('long')
    seconds = F.least(F.date_add('day', 1), F.to_timestamp('END_TIME_')).cast('long') 
    - F.greatest('day', F.to_timestamp('START_TIME_')).cast('long')
    df = df.groupBy('ID', 'START_TIME_', 'END_TIME_').agg(
    F.sum(seconds).alias('TIME_DIFF'),
    F.sum(seconds * workday).alias('TIME_SPENT_WORKING_DAYS')
    )
    df.show(truncate=0)
    # +---+-----------------------+-----------------------+---------+-----------------------+
    # |ID |START_TIME_            |END_TIME_              |TIME_DIFF|TIME_SPENT_WORKING_DAYS|
    # +---+-----------------------+-----------------------+---------+-----------------------+
    # |1  |2022-04-18 09:58:38.906|2022-04-18 12:50:27.204|10309    |10309                  |
    # |2  |2022-04-18 21:26:25.671|2022-04-18 21:26:36.313|11       |11                     |
    # |3  |2022-04-18 21:27:12.388|2022-04-18 21:27:27.227|15       |15                     |
    # |4  |2022-04-19 09:57:40.453|2022-04-29 09:59:07.612|864087   |691287                 |
    # +---+-----------------------+-----------------------+---------+-----------------------+
    
  • 最新更新