在特定日期条件下复制行pyspark



我有一个这样的数据集:

| DATE      | Value    |
| 2022-10-01| x        |
| 2022-11-01| y        |
| 2021-12-01| z        |
| 2022-01-01| xy       |

我准备了代码来列出此数据集中的最大日期与当前日期(2023-01-01)之间的差异。由于在我的数据集中没有2022-12-01或2023-01-01这样的日期,现在我想用相同的值复制同一行,但从前一年开始,并为正确的日期设置年份。所以输出应该像这样:

| DATE      | Value    |
| 2023-01-01| xy       |
| 2022-12-01| z        |
| 2022-10-01| x        |
| 2022-11-01| y        |
| 2021-12-01| z        |
| 2022-01-01| xy       |

列出丢失数据的代码:

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
max_dt = max(shares_union.select("Date").distinct().rdd.flatMap(lambda x: x).collect())
max_dt
diff_months = relativedelta(datetime.now(), max_dt).months
diff_months
for m in range(1, diff_months+1):
print((max_dt + relativedelta(months=m)).replace(day=1))
print()

输出:

2022-12-01
2023-01-01

谢谢你的帮助!

我认为这种类型的问题可能很难在本地PySpark中表达。我将使用Fugue库,它允许我们用Python和Pandas来表达我们的逻辑。当解决方案准备好后,我们可以轻松地将执行引擎切换到Spark。

我们首先只使用Pandas/Python来处理逻辑,然后再考虑将其引入Spark。这应该使它更易于阅读和维护。

首先设置:

import pandas as pd
df = pd.DataFrame({"DATE": ["2022-10-01", "2022-11-01", "2021-12-01", "2022-01-01"],
"Value": ["x", "y", "z", "xy"]})
df['DATE'] = pd.to_datetime(df['DATE'])

如果我们按月计算,这个问题可能会容易一些。不管怎样,如果我没理解错的话,每个时间戳只关心前一年的月份。我们可以用一个函数来求月份和年份。同样,它将在Pandas中定义,我们可以稍后将其引入Spark。

def get_month_and_year(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(month = df['DATE'].dt.month,
year = df['DATE'].dt.year)
get_month_and_year(df)

这给了我们:

DATE    Value   month   year
2022-10-01  x   10  2022
2022-11-01  y   11  2022
2021-12-01  z   12  2021
2022-01-01  xy  1   2022

对于下一个操作,我们将执行groupby-apply类型的操作,对月份进行分组,并检查是否需要根据当前时间添加新行。如果必须的话,我们就执行这个操作。请注意,在执行操作之前,我们将原始DataFrame按月分组。

from dateutil.relativedelta import relativedelta
from datetime import datetime
def generate_new_rows(df: pd.DataFrame) -> pd.DataFrame:
df['DATE'] = pd.to_datetime(df['DATE'])
max_date = df['DATE'].max()
next_date = max_date + relativedelta(years=1)
if next_date <= datetime.now():
# add new row
new_row = {'DATE': [next_date], 'Value': [df.loc[df['DATE'].idxmax()]["Value"]],
'month': [next_date.month], 'year': [next_date.year]}
df = pd.concat([df, pd.DataFrame(new_row)], ignore_index=True)
return df
# test code
temp = get_month_and_year(df)
generate_new_rows(temp.loc[temp['month'] == 12])

得到:


DATE    Value   month   year
2021-12-01  z   12  2021
2022-12-01  z   12  2022

这看起来像预期的输出,所以现在我们可以把它带到Spark与赋格。我们将使用transform()函数将它们转换为pandas_udf。在PySpark中需要Schema,所以我们需要提供它,但是Fugue提供了一个更简单的接口。

import fugue.api as fa
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df)
# The only Fugue code
_tmp = fa.transform(sdf, get_month_and_year, schema="*, month:int, year:int")
out = fa.transform(_tmp, generate_new_rows, schema="*", partition={"by": "month"})
# It returns a Spark DataFrame
out.show()

输出:

+-------------------+-----+-----+----+
|               DATE|Value|month|year|
+-------------------+-----+-----+----+
|2021-12-01 00:00:00|    z|   12|2021|
|2022-12-01 00:00:00|    z|   12|2022|
|2022-10-01 00:00:00|    x|   10|2022|
|2022-01-01 00:00:00|   xy|    1|2022|
|2023-01-01 00:00:00|   xy|    1|2023|
|2022-11-01 00:00:00|    y|   11|2022|
+-------------------+-----+-----+----+

我不完全确定,但我也认为这个解决方案将比多个循环更快。如果要对多组数据执行此操作,只需在分区键中包含附加的。

希望有帮助!

相关内容

  • 没有找到相关文章

最新更新