根据另一个数据框架(在Pandas和PySpark中)中的2个窗口日期填充一个新列



我有2个数据框架。Df1看起来像这样:

DATE    QUANTITY
2015-10-28  14
2015-10-29  881 
2015-10-30  533
2015-10-31  634
2015-11-01  637

我有第二个df, df2就像这样:

STARTDATE      ENDDATE     VALUE
2015-10-25     2015-10-29   2 
2015-11-01     2015-11-15   3

第二个df定义了一个窗口…我想把油箱加满首先使用这个窗口信息,像这样…

最终df

DATE    QUANTITY    VALUE
2015-10-25  nan     2
2015-10-26  nan     2
2015-10-27  nan     2
2015-10-28  14      2
2015-10-29  881     2
2015-10-30  533     0
2015-10-31  634     0
2015-11-01  637     3
2015-11-02  nan     3
2015-11-03  nan     3
2015-11-04  nan     3
...
2015-11-15  nan     3

如何在pandas和PySpark中执行此操作?

任何帮助将非常感激!

熊猫:

您可以使用pd.to_datetime创建一个日期范围,然后在外部合并后爆炸:

首先将日期值转换为datetime dtype(如果已经是datetime dtype则忽略此步骤)

df2[['STARTDATE','ENDDATE']] = df2[['STARTDATE','ENDDATE']].apply(pd.to_datetime)
df1['DATE'] = pd.to_datetime(df1['DATE'])

e = df2[['VALUE']].join(df2.apply(lambda x: pd.date_range(x['STARTDATE'],x['ENDDATE']),axis=1)
.explode().rename("DATE"))
final = e.merge(df1,on='DATE',how='outer')[['DATE','VALUE','QUANTITY']] #ordering is optional
print(final.sort_values("DATE"))
DATE  VALUE  QUANTITY
0  2015-10-25    2.0       NaN
1  2015-10-26    2.0       NaN
2  2015-10-27    2.0       NaN
3  2015-10-28    2.0      14.0
4  2015-10-29    2.0     881.0
20 2015-10-30    NaN     533.0
21 2015-10-31    NaN     634.0
5  2015-11-01    3.0     637.0
6  2015-11-02    3.0       NaN
.........
.......

PySpark

我假设已经应用了日期时间类型,如果没有使用to_date转换日期列:

使用sequence生成日期序列,然后在外部连接后爆炸:

from pyspark.sql import functions as F
final = (sdf2.select(F.explode(F.sequence("STARTDATE","ENDDATE")).alias("DATE"),"VALUE")
.join(sdf1,on='DATE',how='outer').fillna({"VALUE":0}))
final.show()
+----------+-----+--------+
|      DATE|VALUE|QUANTITY|
+----------+-----+--------+
|2015-10-25|    2|    null|
|2015-10-26|    2|    null|
|2015-10-27|    2|    null|
|2015-10-28|    2|      14|
|2015-10-29|    2|     881|
|2015-10-30|    0|     533|
|2015-10-31|    0|     634|
|2015-11-01|    3|     637|
|2015-11-02|    3|    null|
.......
......

我已经为您创建了一个示例。这是不言自明的,所以你会明白的。

import numpy as np
import pandas as pd
# Creating a sample dataset
date = np.array('2015-10-28', dtype=np.datetime64)
date = date + np.arange(5)
df1 = pd.DataFrame()
df1['Date'] = date
df1['Quantity'] = [10,20,30,40,50]
display(df1)
date1 = pd.period_range('2015-10-25', periods=5, freq='3D')
date2 = pd.period_range('2015-10-30', periods=5, freq='3D')
df2 = pd.DataFrame()
df2['StartDate'] = date1
df2['EndDate'] = date2
df2['value'] = [2,2,2,2,2]
display(df2)
# Solving the problem
date_range = []
values = []
for i,j, value in df2[['StartDate', 'EndDate', 'value']].values:
dates = np.arange(str(i), str(j), dtype=np.datetime64)
date_range += list(dates)
values += [value]*len(dates)
temp_df = pd.DataFrame()
temp_df['Date'] = date_range
temp_df['Value'] = values
final_df = df1.merge(temp_df, on='Date', how='outer')
display(final_df)

如果它解决了你的问题,请打绿色勾。

从表面上看,这个问题需要一个范围连接,而这在pandas (AFAIK)中是不可能实现的。下面的pandas作为SQLite的组合应该可以解决这个问题,而不需要交叉连接(这会增加行数)或循环。

# solution using pandas as sqlite
#### Setup
import pandas as pd
import sqlite3
lst1 = [
['2015-10-28' ,14],
['2015-10-29' ,881],
['2015-10-30' ,533],
['2015-10-31' ,634],
['2015-11-01' ,637]
]
df1 = pd.DataFrame(lst1, columns = ['DATE','QUANTITY'])
lst2 = [
['2015-10-25','2015-10-29',2],
['2015-11-01','2015-11-15',3]
]
df2= pd.DataFrame(lst2, columns = ['STARTDATE','ENDDATE','VALUE'])
#### Solution
# Create dataframe with all required dates between STARTDATE and ENDDATE in df_2
dt_df = pd.DataFrame(pd.date_range(start = df2['STARTDATE'].min(), end=df2['ENDDATE'].max()),columns = ['DATE'])
dt_df['DATE'] = dt_df['DATE'].astype('str')
conn = sqlite3.connect(":memory:") 
df1.to_sql("df1", conn, index=False)
df2.to_sql("df2", conn, index=False)
dt_df.to_sql("dt_df", conn, index=False)
sql = """
select 
dt_df.DATE
,df1.QUANTITY
,df2.VALUE
from
dt_df
left join
df1
on
dt_df.DATE = df1.DATE
left join
df2
on
dt_df.DATE >= df2.STARTDATE
and
dt_df.DATE <= df2.ENDDATE
"""
op_df = pd.read_sql_query(sql,conn)
op_df

最新更新