我有2个数据框架。Df1看起来像这样:
DATE QUANTITY
2015-10-28 14
2015-10-29 881
2015-10-30 533
2015-10-31 634
2015-11-01 637
…
我有第二个df, df2就像这样:
STARTDATE ENDDATE VALUE
2015-10-25 2015-10-29 2
2015-11-01 2015-11-15 3
第二个df定义了一个窗口…我想把油箱加满首先使用这个窗口信息,像这样…
最终df
DATE QUANTITY VALUE
2015-10-25 nan 2
2015-10-26 nan 2
2015-10-27 nan 2
2015-10-28 14 2
2015-10-29 881 2
2015-10-30 533 0
2015-10-31 634 0
2015-11-01 637 3
2015-11-02 nan 3
2015-11-03 nan 3
2015-11-04 nan 3
...
2015-11-15 nan 3
如何在pandas和PySpark中执行此操作?
任何帮助将非常感激!
熊猫:
您可以使用pd.to_datetime
创建一个日期范围,然后在外部合并后爆炸:
首先将日期值转换为datetime dtype(如果已经是datetime dtype则忽略此步骤)
df2[['STARTDATE','ENDDATE']] = df2[['STARTDATE','ENDDATE']].apply(pd.to_datetime)
df1['DATE'] = pd.to_datetime(df1['DATE'])
e = df2[['VALUE']].join(df2.apply(lambda x: pd.date_range(x['STARTDATE'],x['ENDDATE']),axis=1)
.explode().rename("DATE"))
final = e.merge(df1,on='DATE',how='outer')[['DATE','VALUE','QUANTITY']] #ordering is optional
print(final.sort_values("DATE"))
DATE VALUE QUANTITY
0 2015-10-25 2.0 NaN
1 2015-10-26 2.0 NaN
2 2015-10-27 2.0 NaN
3 2015-10-28 2.0 14.0
4 2015-10-29 2.0 881.0
20 2015-10-30 NaN 533.0
21 2015-10-31 NaN 634.0
5 2015-11-01 3.0 637.0
6 2015-11-02 3.0 NaN
.........
.......
PySpark
我假设已经应用了日期时间类型,如果没有使用to_date
转换日期列:
使用sequence
生成日期序列,然后在外部连接后爆炸:
from pyspark.sql import functions as F
final = (sdf2.select(F.explode(F.sequence("STARTDATE","ENDDATE")).alias("DATE"),"VALUE")
.join(sdf1,on='DATE',how='outer').fillna({"VALUE":0}))
final.show()
+----------+-----+--------+
| DATE|VALUE|QUANTITY|
+----------+-----+--------+
|2015-10-25| 2| null|
|2015-10-26| 2| null|
|2015-10-27| 2| null|
|2015-10-28| 2| 14|
|2015-10-29| 2| 881|
|2015-10-30| 0| 533|
|2015-10-31| 0| 634|
|2015-11-01| 3| 637|
|2015-11-02| 3| null|
.......
......
我已经为您创建了一个示例。这是不言自明的,所以你会明白的。
import numpy as np
import pandas as pd
# Creating a sample dataset
date = np.array('2015-10-28', dtype=np.datetime64)
date = date + np.arange(5)
df1 = pd.DataFrame()
df1['Date'] = date
df1['Quantity'] = [10,20,30,40,50]
display(df1)
date1 = pd.period_range('2015-10-25', periods=5, freq='3D')
date2 = pd.period_range('2015-10-30', periods=5, freq='3D')
df2 = pd.DataFrame()
df2['StartDate'] = date1
df2['EndDate'] = date2
df2['value'] = [2,2,2,2,2]
display(df2)
# Solving the problem
date_range = []
values = []
for i,j, value in df2[['StartDate', 'EndDate', 'value']].values:
dates = np.arange(str(i), str(j), dtype=np.datetime64)
date_range += list(dates)
values += [value]*len(dates)
temp_df = pd.DataFrame()
temp_df['Date'] = date_range
temp_df['Value'] = values
final_df = df1.merge(temp_df, on='Date', how='outer')
display(final_df)
如果它解决了你的问题,请打绿色勾。
从表面上看,这个问题需要一个范围连接,而这在pandas (AFAIK)中是不可能实现的。下面的pandas作为SQLite的组合应该可以解决这个问题,而不需要交叉连接(这会增加行数)或循环。
# solution using pandas as sqlite
#### Setup
import pandas as pd
import sqlite3
lst1 = [
['2015-10-28' ,14],
['2015-10-29' ,881],
['2015-10-30' ,533],
['2015-10-31' ,634],
['2015-11-01' ,637]
]
df1 = pd.DataFrame(lst1, columns = ['DATE','QUANTITY'])
lst2 = [
['2015-10-25','2015-10-29',2],
['2015-11-01','2015-11-15',3]
]
df2= pd.DataFrame(lst2, columns = ['STARTDATE','ENDDATE','VALUE'])
#### Solution
# Create dataframe with all required dates between STARTDATE and ENDDATE in df_2
dt_df = pd.DataFrame(pd.date_range(start = df2['STARTDATE'].min(), end=df2['ENDDATE'].max()),columns = ['DATE'])
dt_df['DATE'] = dt_df['DATE'].astype('str')
conn = sqlite3.connect(":memory:")
df1.to_sql("df1", conn, index=False)
df2.to_sql("df2", conn, index=False)
dt_df.to_sql("dt_df", conn, index=False)
sql = """
select
dt_df.DATE
,df1.QUANTITY
,df2.VALUE
from
dt_df
left join
df1
on
dt_df.DATE = df1.DATE
left join
df2
on
dt_df.DATE >= df2.STARTDATE
and
dt_df.DATE <= df2.ENDDATE
"""
op_df = pd.read_sql_query(sql,conn)
op_df