我正在处理Dask中的面板数据(即id和时间段的面板),并希望重新采样从微秒到30秒的频率。示例数据如下所示:
size price ID
datetime
2018-09-26 13:50:00.000600 300 17.8185 AR
2018-09-26 13:50:00.004797 25 37.1165 BCOR
2018-09-26 13:50:00.005955 300 17.8185 AR
2018-09-26 13:50:00.006066 100 78.6200 XLI
2018-09-26 13:50:00.006862 100 73.0600 ABT
2018-09-26 13:50:00.007164 100 73.0600 ABT
2018-09-26 13:50:00.008643 100 73.3332 FAS
2018-09-26 13:50:00.008762 100 73.0600 ABT
2018-09-26 13:50:00.008793 2 114.4950 MSFT
2018-09-26 13:50:00.008978 100 20.6350 NWL
,其中ID为string
, datetime为datetime
对象(当前设置为索引),size为int64
, price为float64
。我想:
- groupby
ID
- 重新采样到30秒频率
- ,按其平均值汇总价格,按其总和汇总规模。本质上,按不同的函数聚合列。
我知道Dask不支持按样例分组操作,但是根据这里的一篇优秀的文章,使用Dask和pandas混合使用似乎是可行的。
我目前的尝试(基于上面的链接文章)是:
def per_group(blk):
return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})
ddf.groupby('ID').apply(per_group, meta=ddf).compute()
但是它返回TypeError: 'Series' objects are mutable, thus they cannot be hashed
。我的感觉是它与"ID"列有关,但我不能弄清楚。我也试着提供meta={'size': np.int64, 'price': np.float64, 'ID': 'object'}
,但无济于事。
希望看到任何其他方法可以更有效地完成!谢谢。
要使用.resample
,索引应该是日期时间(或其他合适的dtype)。一种解决方案是修改聚合函数并添加日期时间索引的设置(另一种是提前按日期时间进行索引):
def per_group(df):
return (
df
.set_index("datetime")
.resample("30S")
.agg({"price": "mean", "size": "mean"})
)
ddf.groupby("ID").apply(per_group).compute()
完整的可复制片段:
from io import StringIO
from dask.dataframe import from_pandas
from pandas import read_fwf, to_datetime
data = StringIO(
"""
datetime size price ID
2018-09-26 13:50:00.000600 300 17.8185 AR
2018-09-26 13:50:00.004797 25 37.1165 BCOR
2018-09-26 13:50:00.005955 300 17.8185 AR
2018-09-26 13:50:00.006066 100 78.6200 XLI
2018-09-26 13:50:00.006862 100 73.0600 ABT
2018-09-26 13:50:00.007164 100 73.0600 ABT
2018-09-26 13:50:00.008643 100 73.3332 FAS
2018-09-26 13:50:00.008762 100 73.0600 ABT
2018-09-26 13:50:00.008793 2 114.4950 MSFT
2018-09-26 13:50:00.008978 100 20.6350 NWL
"""
)
df = read_fwf(data)
df["datetime"] = df["datetime"] + " " + df["Unnamed: 1"]
df["datetime"] = to_datetime(df["datetime"])
ddf = from_pandas(df, npartitions=2)
def per_group(df):
return (
df.set_index("datetime").resample("30S").agg({"price": "mean", "size": "mean"})
)
ddf.groupby("ID").apply(per_group).compute()