我有一个这样的数据集:
| DATE | Value |
| 2022-10-01| x |
| 2022-11-01| y |
| 2021-12-01| z |
| 2022-01-01| xy |
我准备了代码来列出此数据集中的最大日期与当前日期(2023-01-01)之间的差异。由于在我的数据集中没有2022-12-01或2023-01-01这样的日期,现在我想用相同的值复制同一行,但从前一年开始,并为正确的日期设置年份。所以输出应该像这样:
| DATE | Value |
| 2023-01-01| xy |
| 2022-12-01| z |
| 2022-10-01| x |
| 2022-11-01| y |
| 2021-12-01| z |
| 2022-01-01| xy |
列出丢失数据的代码:
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
max_dt = max(shares_union.select("Date").distinct().rdd.flatMap(lambda x: x).collect())
max_dt
diff_months = relativedelta(datetime.now(), max_dt).months
diff_months
for m in range(1, diff_months+1):
print((max_dt + relativedelta(months=m)).replace(day=1))
print()
输出:
2022-12-01
2023-01-01
谢谢你的帮助!
我认为这种类型的问题可能很难在本地PySpark中表达。我将使用Fugue库,它允许我们用Python和Pandas来表达我们的逻辑。当解决方案准备好后,我们可以轻松地将执行引擎切换到Spark。
我们首先只使用Pandas/Python来处理逻辑,然后再考虑将其引入Spark。这应该使它更易于阅读和维护。
首先设置:
import pandas as pd
df = pd.DataFrame({"DATE": ["2022-10-01", "2022-11-01", "2021-12-01", "2022-01-01"],
"Value": ["x", "y", "z", "xy"]})
df['DATE'] = pd.to_datetime(df['DATE'])
如果我们按月计算,这个问题可能会容易一些。不管怎样,如果我没理解错的话,每个时间戳只关心前一年的月份。我们可以用一个函数来求月份和年份。同样,它将在Pandas中定义,我们可以稍后将其引入Spark。
def get_month_and_year(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(month = df['DATE'].dt.month,
year = df['DATE'].dt.year)
get_month_and_year(df)
这给了我们:
DATE Value month year
2022-10-01 x 10 2022
2022-11-01 y 11 2022
2021-12-01 z 12 2021
2022-01-01 xy 1 2022
对于下一个操作,我们将执行groupby-apply类型的操作,对月份进行分组,并检查是否需要根据当前时间添加新行。如果必须的话,我们就执行这个操作。请注意,在执行操作之前,我们将原始DataFrame按月分组。
from dateutil.relativedelta import relativedelta
from datetime import datetime
def generate_new_rows(df: pd.DataFrame) -> pd.DataFrame:
df['DATE'] = pd.to_datetime(df['DATE'])
max_date = df['DATE'].max()
next_date = max_date + relativedelta(years=1)
if next_date <= datetime.now():
# add new row
new_row = {'DATE': [next_date], 'Value': [df.loc[df['DATE'].idxmax()]["Value"]],
'month': [next_date.month], 'year': [next_date.year]}
df = pd.concat([df, pd.DataFrame(new_row)], ignore_index=True)
return df
# test code
temp = get_month_and_year(df)
generate_new_rows(temp.loc[temp['month'] == 12])
得到:
DATE Value month year
2021-12-01 z 12 2021
2022-12-01 z 12 2022
这看起来像预期的输出,所以现在我们可以把它带到Spark与赋格。我们将使用transform()
函数将它们转换为pandas_udf。在PySpark中需要Schema,所以我们需要提供它,但是Fugue提供了一个更简单的接口。
import fugue.api as fa
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df)
# The only Fugue code
_tmp = fa.transform(sdf, get_month_and_year, schema="*, month:int, year:int")
out = fa.transform(_tmp, generate_new_rows, schema="*", partition={"by": "month"})
# It returns a Spark DataFrame
out.show()
输出:
+-------------------+-----+-----+----+
| DATE|Value|month|year|
+-------------------+-----+-----+----+
|2021-12-01 00:00:00| z| 12|2021|
|2022-12-01 00:00:00| z| 12|2022|
|2022-10-01 00:00:00| x| 10|2022|
|2022-01-01 00:00:00| xy| 1|2022|
|2023-01-01 00:00:00| xy| 1|2023|
|2022-11-01 00:00:00| y| 11|2022|
+-------------------+-----+-----+----+
我不完全确定,但我也认为这个解决方案将比多个循环更快。如果要对多组数据执行此操作,只需在分区键中包含附加的。
希望有帮助!