Dask: groupby-resample具有不同的聚合规则



我正在处理Dask中的面板数据(即id和时间段的面板),并希望重新采样从微秒到30秒的频率。示例数据如下所示:

size     price       ID
datetime                                           
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL

,其中ID为string, datetime为datetime对象(当前设置为索引),size为int64, price为float64。我想:

  1. groupbyID
  2. 重新采样到30秒频率
  3. ,按其平均值汇总价格,按其总和汇总规模。本质上,按不同的函数聚合列。

我知道Dask不支持按样例分组操作,但是根据这里的一篇优秀的文章,使用Dask和pandas混合使用似乎是可行的。

我目前的尝试(基于上面的链接文章)是:

def per_group(blk):
return blk.resample('30S').agg({blk['price']: np.mean, blk['size']: np.sum})
ddf.groupby('ID').apply(per_group, meta=ddf).compute() 

但是它返回TypeError: 'Series' objects are mutable, thus they cannot be hashed。我的感觉是它与"ID"列有关,但我不能弄清楚。我也试着提供meta={'size': np.int64, 'price': np.float64, 'ID': 'object'},但无济于事。

希望看到任何其他方法可以更有效地完成!谢谢。

要使用.resample,索引应该是日期时间(或其他合适的dtype)。一种解决方案是修改聚合函数并添加日期时间索引的设置(另一种是提前按日期时间进行索引):

def per_group(df):
return (
df
.set_index("datetime")
.resample("30S")
.agg({"price": "mean", "size": "mean"})
)
ddf.groupby("ID").apply(per_group).compute()

完整的可复制片段:

from io import StringIO
from dask.dataframe import from_pandas
from pandas import read_fwf, to_datetime
data = StringIO(
"""
datetime                    size     price       ID
2018-09-26 13:50:00.000600   300   17.8185       AR
2018-09-26 13:50:00.004797    25   37.1165     BCOR
2018-09-26 13:50:00.005955   300   17.8185       AR
2018-09-26 13:50:00.006066   100   78.6200      XLI
2018-09-26 13:50:00.006862   100   73.0600      ABT
2018-09-26 13:50:00.007164   100   73.0600      ABT
2018-09-26 13:50:00.008643   100   73.3332      FAS
2018-09-26 13:50:00.008762   100   73.0600      ABT
2018-09-26 13:50:00.008793     2  114.4950     MSFT
2018-09-26 13:50:00.008978   100   20.6350      NWL
"""
)
df = read_fwf(data)
df["datetime"] = df["datetime"] + " " + df["Unnamed: 1"]
df["datetime"] = to_datetime(df["datetime"])
ddf = from_pandas(df, npartitions=2)

def per_group(df):
return (
df.set_index("datetime").resample("30S").agg({"price": "mean", "size": "mean"})
)

ddf.groupby("ID").apply(per_group).compute()

相关内容

  • 没有找到相关文章

最新更新