有人知道如何在PySpark中进行R data.table滚动联接吗?
这里借用本关于滚动联接的例子和很好的解释;
sales<-data.table(saleID=c("S1","S2","S3","S4","S5"),
saleDate=as.Date(c("2014-2-20","2014-5-1","2014-6-15","2014-7- 1","2014-12-31")))
commercials<-data.table(commercialID=c("C1","C2","C3","C4"),
commercialDate=as.Date(c("2014-1-1","2014-4-1","2014-7-1","2014-9-15")))
setkey(sales,"saleDate")
setkey(commercials,"commercialDate")
sales[commercials, roll=TRUE]
结果是;
saleDate saleID commercialID
1: 2014-01-01 NA C1
2: 2014-04-01 S1 C2
3: 2014-07-01 S4 C3
4: 2014-09-15 S4 C4
非常感谢你的帮助。
滚动联接不是join
+fillna
首先,滚动连接是而不是与join
和fillna
相同!只有当连接的表的键(根据data.table,即左表和右表(在主表中具有等价项时,才会出现这种情况。data.table滚动联接不需要此功能。
据我所知,没有直接的对等物,我搜索了很长一段时间。甚至还有一个问题https://github.com/pandas-dev/pandas/issues/7546.但是:
熊猫的解决方案:
不过,在熊猫身上有一个解决方案。假设右边的数据表是表A,左边的数据表则是表B。
- 按键对表A和表B进行排序
- 将列
tag
添加到全部为0的a,将列tag
添加到全部均为1的B - 从B中删除除键和
tag
之外的所有列(可以省略,但这样更清楚(,并调用表B'。保留B作为原件——我们以后需要它 - 将A与B'连接到C,并忽略从B'开始的行有许多NA这一事实
- 按键对C排序
- 用
C = C.assign(groupNr = np.cumsum(C.tag))
制作一个新的累计列 - 在
tag
上使用过滤(query
(来去除所有的B'-行 - 将运行计数器列
groupNr
添加到原始B(从0到N-1或从1到N的整数,具体取决于您想要正向还是反向滚动联接( - 在
groupNr
上加入B和C
编程代码
#0. 'date' is the key for the rolling join. It does not have to be a date.
A = pd.DataFrame.from_dict(
{'date': pd.to_datetime(["2014-3-1", "2014-5-1", "2014-6-1", "2014-7-1", "2014-12-1"]),
'value': ["a1", "a2", "a3", "a4", "a5"]})
B = pd.DataFrame.from_dict(
{'date': pd.to_datetime(["2014-1-15", "2014-3-15", "2014-6-15", "2014-8-15", "2014-11-15", "2014-12-15"]),
'value': ["b1", "b2", "b3", "b4", "b5", "b6"]})
#1. Sort the table A and and B each by key.
A = A.sort_values('date')
B = B.sort_values('date')
#2. Add a column tag to A which are all 0 and a column tag to B that are all 1.
A['tag'] = 0
B['tag'] = 1
#3. Delete all columns except the key and tagfrom B (can be omitted, but it is clearer this way) and call the table B'. Keep B as an original - we are going to need it later.
B_ = B[['date','tag']] # You need two [], because you get a series otherwise.
#4. Concatenate A with B' to C and ignore the fact that the rows from B' has many NAs.
C = pd.concat([A, B_])
#5. Sort C by key.
C = C.sort_values('date')
#6. Make a new cumsum column with C = C.assign(groupNr = np.cumsum(C.tag))
C = C.assign(groupNr = np.cumsum(C.tag))
#7. Using filtering (query) on tag get rid of all B'-rows.
C = C[C.tag == 0]
#8. Add a running counter column groupNr to the original B (integers from 0 to N-1 or from 1 to N, depending on whether you want forward or backward rolling join).
B['groupNr'] = range(len(B)+1)[1:] # B's values are carried forward to A's values
B['groupNr'] = range(len(B)) # B's values are carried backward to A's values
#9. Join B with C on groupNr to D.
D = C.set_index('groupNr').join(B.set_index('groupNr'), lsuffix='_A', rsuffix='_B')
我也有一个类似的问题,用pandas.merge_asof解决了。
以下是暴露案例的快速解决方案:
sales = pd.DataFrame.from_dict(
{'saleDate': pd.to_datetime(["2014-02-20","2014-05-01","2014-06-15","2014-07-01","2014-12-31"]),
'saleID': ["S1","S2","S3","S4","S5"]})
commercials = pd.DataFrame.from_dict(
{'commercialDate': pd.to_datetime(["2014-01-01","2014-04-01","2014-07-01","2014-09-15"]),
'commercialID': ["C1","C2","C3","C4"]}
result = pd.merge_asof(commercials,
sales,
left_on='commercialDate',
right_on='saleDate')
# Ordering for easier comparison
result = result[['commercialDate','saleID','commercialID' ]]
结果与预期相同:
commercialDate saleID commercialID
0 2014-01-01 NaN C1
1 2014-04-01 S1 C2
2 2014-07-01 S4 C3
3 2014-09-15 S4 C4
这可能是一个更简单的解决方案。
sales.asfreq("D",method="ffill").join(commercials,how="outer").dropna(subset= ["commercialID"])
我在中的第一个示例中对此进行了测试https://gormanalysis.com/r-data-table-rolling-joins/它是有效的。类似的方法可能用于其他滚动连接。