我有一个带有列的火花数据框 - timestamp
类型的"日期"和 long
类型的"数量"。对于每个日期,我都有一些数量值。日期按递增顺序排序。但是有些日期是缺失的。例如 -当前DF -
Date | Quantity
10-09-2016 | 1
11-09-2016 | 2
14-09-2016 | 0
16-09-2016 | 1
17-09-2016 | 0
20-09-2016 | 2
如您所见,df 有一些缺失的日期,例如 12-09-2016、13-09-2016 等。我想在缺少日期的数量字段中输入 0,以便生成的 df 应如下所示 -
Date | Quantity
10-09-2016 | 1
11-09-2016 | 2
12-09-2016 | 0
13-09-2016 | 0
14-09-2016 | 0
15-09-2016 | 0
16-09-2016 | 1
17-09-2016 | 0
18-09-2016 | 0
19-09-2016 | 0
20-09-2016 | 2
任何有关此的帮助/建议将不胜感激。提前谢谢。请注意,我正在用scala编码。
为了便于理解代码,我以有点冗长的方式编写了这个答案。它可以被优化。
需要的进口
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}
字符串到有效日期格式的 UDF
val date_transform = udf((date: String) => {
val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
val dt = LocalDate.parse(date, dtFormatter)
"%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
.replaceAll(" ", "0")
})
以下 UDF 代码取自 迭代日期范围
def fill_dates = udf((start: String, excludedDiff: Int) => {
val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val fromDt = LocalDateTime.parse(start, dtFormatter)
(1 to (excludedDiff - 1)).map(day => {
val dt = fromDt.plusDays(day)
"%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
.replaceAll(" ", "0")
})
})
设置示例数据帧 ( df
)
val df = Seq(
("10-09-2016", 1),
("11-09-2016", 2),
("14-09-2016", 0),
("16-09-2016", 1),
("17-09-2016", 0),
("20-09-2016", 2)).toDF("date", "quantity")
.withColumn("date", date_transform($"date").cast(TimestampType))
.withColumn("quantity", $"quantity".cast(LongType))
df.printSchema()
root
|-- date: timestamp (nullable = true)
|-- quantity: long (nullable = false)
df.show()
+-------------------+--------+
| date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00| 1|
|2016-09-11 00:00:00| 2|
|2016-09-14 00:00:00| 0|
|2016-09-16 00:00:00| 1|
|2016-09-17 00:00:00| 0|
|2016-09-20 00:00:00| 2|
+-------------------+--------+
创建一个临时数据帧(tempDf
)以union
df
:
val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
.filter($"diff" > 1) // Pick date diff more than one day to generate our date
.withColumn("next_dates", fill_dates($"date", $"diff"))
.withColumn("quantity", lit("0"))
.withColumn("date", explode($"next_dates"))
.withColumn("date", $"date".cast(TimestampType))
tempDf.show(false)
+-------------------+--------+----+------------------------+
|date |quantity|diff|next_dates |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0 |3 |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0 |3 |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0 |2 |[2016-09-15] |
|2016-09-18 00:00:00|0 |3 |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0 |3 |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+
现在合并两个数据帧
val result = df.union(tempDf.select("date", "quantity"))
.orderBy("date")
result.show()
+-------------------+--------+
| date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00| 1|
|2016-09-11 00:00:00| 2|
|2016-09-12 00:00:00| 0|
|2016-09-13 00:00:00| 0|
|2016-09-14 00:00:00| 0|
|2016-09-15 00:00:00| 0|
|2016-09-16 00:00:00| 1|
|2016-09-17 00:00:00| 0|
|2016-09-18 00:00:00| 0|
|2016-09-19 00:00:00| 0|
|2016-09-20 00:00:00| 2|
+-------------------+--------+
基于@mrsrinivas很好的答案,这里是 PySpark 版本。
需要的进口
from typing import List
import datetime
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import col, lit, udf, datediff, lead, explode
from pyspark.sql.types import DateType, ArrayType
UDF 创建下一个日期的范围
def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
return [start_date + datetime.timedelta(days=days) for days in range(1, diff)]
创建填充日期的日期框的功能(支持"分组"列):
def _get_fill_dates_df(df: DataFrame, date_column: str, group_columns: List[str], fill_column: str) -> DataFrame:
get_next_dates_udf = udf(_get_next_dates, ArrayType(DateType()))
window = Window.orderBy(*group_columns, date_column)
return df.withColumn("_diff", datediff(lead(date_column, 1).over(window), date_column))
.filter(col("_diff") > 1).withColumn("_next_dates", get_next_dates_udf(date_column, "_diff"))
.withColumn(fill_column, lit("0")).withColumn(date_column, explode("_next_dates"))
.drop("_diff", "_next_dates")
函数的用法:
fill_df = _get_fill_dates_df(df, "Date", [], "Quantity")
df = df.union(fill_df)
它假定日期列已处于日期类型。
下面是一个轻微的修改,将此功能与月份一起使用并输入度量列(应设置为零的列)而不是组列:
from typing import List
import datetime
from dateutil import relativedelta
import math
import pyspark.sql.functions as f
from pyspark.sql import DataFrame, Window
from pyspark.sql.types import DateType, ArrayType
def fill_time_gaps_date_diff_based(df: pyspark.sql.dataframe.DataFrame, measure_columns: list, date_column: str):
group_columns = [col for col in df.columns if col not in [date_column]+measure_columns]
# save measure sums for qc
qc = df.agg({col: 'sum' for col in measure_columns}).collect()
# convert month to date
convert_int_to_date = f.udf(lambda mth: datetime.datetime(year=math.floor(mth/100), month=mth%100, day=1), DateType())
df = df.withColumn(date_column, convert_int_to_date(date_column))
# sort values
df = df.orderBy(group_columns)
# get_fill_dates_df (instead of months_between also use date_diff for days)
window = Window.orderBy(*group_columns, date_column)
# calculate diff column
fill_df = df.withColumn(
"_diff",
f.months_between(f.lead(date_column, 1).over(window), date_column).cast(IntegerType())
).filter(
f.col("_diff") > 1
)
# generate next dates
def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
return [
start_date + relativedelta.relativedelta(months=months)
for months in range(1, diff)
]
get_next_dates_udf = f.udf(_get_next_dates, ArrayType(DateType()))
fill_df = fill_df.withColumn(
"_next_dates",
get_next_dates_udf(date_column, "_diff")
)
# set measure columns to 0
for col in measure_columns:
fill_df = fill_df.withColumn(col, f.lit(0))
# explode next_dates column
fill_df = fill_df.withColumn(date_column, f.explode('_next_dates'))
# drop unneccessary columns
fill_df = fill_df.drop(
"_diff",
"_next_dates"
)
# union df with fill_df
df = df.union(fill_df)
# qc: should be removed for productive runs
if qc != df.agg({col: 'sum' for col in measure_columns}).collect():
raise ValueError('Sums before and after run do not fit.')
return df
请注意,我假设月份以 YYYYMM 的形式以整数形式给出。这可以通过修改"转换月份至今"部分轻松调整。