熊猫一组一组地聚集在一起



我有一个panda代码,想翻译成Dask

让我们有一个伪数据

import dask.dataframe as dd
df = pd.DataFrame({'item_id': [10, 10, 10, 8, 8, 8], 'rating': [3, 4, 2, 1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=2)

这是熊猫的代码

bb = df[['item_id', 'rating']].
groupby(['item_id']).agg(
item_hist_rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x))),
item_hist_rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x))),
item_hist_rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.25 )),
item_hist_rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q = 0.75 )),
item_hist_rating_min=pd.NamedAgg(column='rating', aggfunc='min'),
item_hist_rating_count=pd.NamedAgg(column='rating', aggfunc='count'),
item_hist_rating_max=pd.NamedAgg(column='rating', aggfunc='max'),
item_hist_rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean),
).reset_index().round(2)
bb

我知道用Dask可以计算出其中四个数字,如下

ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min']).compute()

另外两个像

ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean().compute()
ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean().compute()

但我不知道1(如何按.quantile分组,也不知道2(如何连接这些结果?

import numpy as np
import pandas as pd
import dask.dataframe as dd
stats_df = ddf.groupby(['item_id'])['rating'].aggregate(['sum', 'mean', 'max', 'min', 'count'])
stats_df['rating_up'] = ddf['rating'].ge(3.75).groupby(ddf['item_id']).mean() * 100
stats_df['rating_down'] = ddf['rating'].lt(3.75).groupby(ddf['item_id']).mean() * 100
q25 = ddf.groupby('item_id')['rating'].apply(
lambda x: x.quantile(0.25))
q75 = ddf.groupby('item_id')['rating'].apply(
lambda x: x.quantile(0.75))

qdf = dd.merge(stats_df, q25, left_index=True, right_index=True)
edf = dd.merge(stats_df, q75, left_index=True, right_index=True)
ldf = dd.merge(qdf, edf[['rating']], left_index=True, right_index=True)
ldf.columns = ['sum', 'mean', 'max', 'min', 'count', 'rating_up', 'rating_down', 'q25', 'q75']
ldf.compute().reset_index().round(2)

输出:

Out[24]: 
item_id  sum  mean  max  min  count  rating_up  rating_down  q25  q75
0        8    6   2.0    3    1      3       0.00       100.00  1.5  2.5
1       10    9   3.0    4    2      3      33.33        66.67  2.5  3.5

或者,使用dask.delayed对panda命令进行并行计算。根据他们的文档,并行性是通过具有许多延迟调用来实现的。因此,以下将聚合分解为几个延迟函数。

from dask import delayed
@delayed
def rating_up(x):
return x.groupby(['item_id']).agg(
rating_up=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x>=3.75) / len(x), 2)))
@delayed
def rating_down(x):
return x.groupby(['item_id']).agg(
rating_down=pd.NamedAgg(column='rating', aggfunc=lambda x: round(100 * sum(x<3.75) / len(x), 2)))
@delayed
def q_25(x):
return x.groupby(['item_id']).agg(
rating_q25=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.25 )))
@delayed
def q_75(x):
return x.groupby(['item_id']).agg(
rating_q75=pd.NamedAgg(column='rating', aggfunc=lambda x: np.quantile(x, q=0.75 )))
@delayed
def rating_min(x):
return x.groupby(['item_id']).agg(
rating_min=pd.NamedAgg(column='rating', aggfunc='min'))
@delayed
def rating_max(x):
return x.groupby(['item_id']).agg(
rating_max=pd.NamedAgg(column='rating', aggfunc='max'))
@delayed
def rating_count(x):
return x.groupby(['item_id']).agg(
rating_count=pd.NamedAgg(column='rating', aggfunc='count'))
@delayed
def rating_avg(x):
return x.groupby(['item_id']).agg(
rating_avg=pd.NamedAgg(column='rating', aggfunc=np.mean))

def stats(x):
count_ = rating_count(x)
up = rating_up(x)
down = rating_down(x)
q25 = q_25(x)
q75 = q_75(x)
rate_min = rating_min(x)
rate_max = rating_max(x)
rate_avg = rating_avg(x)
ddf = count_.join(up).join(down).join(q25).join(q75).join(rate_min).
join(rate_max).join(rate_avg)
return ddf
stats_df = stats(df)
print(stats_df.compute().reset_index().round(2))

输出:

item_id  rating_count  rating_up  rating_down  rating_q25  rating_q75  rating_min  rating_max  rating_avg
0        8             3       0.00       100.00         1.5         2.5           1           3           2
1       10             3      33.33        66.67         2.5         3.5           2           4           3

最新更新