我使用的是Spark数据帧。我有一个用例,需要将日期增加一。如果递增日期恰好是周末,那么我需要将其递增到下一周/工作日。
val df = Seq(
("50312", "2021-12-01", "0.9992019"),
("50312", "2021-12-02", "0.20171201"),
("50312", "2021-12-03", "2.9992019")
).toDF("id","some_date","item_value")
.withColumn("nextworking_day", date_add(col("some_date"),1))
下一个工作日应该是下个工作日,而不是周末。怎么做?
您可以使用dayofweek
来获取工作日的编号,如果是星期六,则加2;如果是星期五,则加3。
val day = dayofweek(col("some_date"))
val nextworkday = col("some_date") + when(day > 5, -day + 9).otherwise(1)
val df = Seq(
("50312", "2021-12-01", "0.9992019"),
("50312", "2021-12-02", "0.20171201"),
("50312", "2021-12-03", "2.9992019")
).toDF("id","some_date","item_value")
.withColumn("some_date", col("some_date").cast("date"))
.withColumn("nextworking_day", nextworkday)
df.show()
+-----+----------+----------+---------------+
| id| some_date|item_value|nextworking_day|
+-----+----------+----------+---------------+
|50312|2021-12-01| 0.9992019| 2021-12-02|
|50312|2021-12-02|0.20171201| 2021-12-03|
|50312|2021-12-03| 2.9992019| 2021-12-06|
+-----+----------+----------+---------------+
编写一个udf来检查日期应该可以解决问题以下是在pyspark中运行的示例代码,不包含假日代码,但您可以创建一个List或enum,并根据您的区域添加一个条件
import pyspark.sql.functions as f
from pyspark.sql.types import TimestampType
from datetime import datetime, timedelta
@f.udf(returnType=TimestampType())
def get_convert_date_udf(date_column):
datetime_object = datetime.strptime(date_column, "%Y-%m-%d")
new_datetime_object = datetime_object + timedelta(days=1)
day = new_datetime_object.strftime("%A")
if day == "Sunday":
new_datetime_object += timedelta(days=1)
elif day == "Saturday":
new_datetime_object += timedelta(days=2)
return new_datetime_object
df = df.withColumn("next_working_date",
get_convert_date_udf(f.col("some_date")))