使用另一个数据帧中的值创建新的 dask 数据帧列会导致错误"chunk sizes are unknown"



我正在尝试在 dask 数据帧中创建一个新列,其值将是来自另一个数据帧的特定索引值。

import pandas as pd
import numpy as np
import dask.dataframe as dd
holdings=pd.DataFrame({'cusip': ['abcd', 'efgh', 'ijkl'], 'date': ['1/1/2000', '1/1/2005', '1/1/2010']})
ratings=pd.DataFrame({'cusip':['abcd','efgh','efgh'],'date':['1/1/2001','1/1/2004','1/1/2006'],'rating':['A','AAA','B']}
dd.from_pandas(df1, npartitions=2)

日期都是日期时间.日期类型。目标是,对于持股中的每一行,新列将包含来自评级的索引,其中该行包含持股日期的 cusip 的最新可用评级。例如,持股中新列的第二行应包含指向第二行评级的索引。

我编写了以下代码,当持股和评级只是熊猫数据帧(不是 dask)时,它完成了我正在寻找的内容:

def get_rating_index(cusip,date,ratings):
if cusip in ratings['cusip'].values:
temp=ratings[ratings['cusip']==cusip]
avail_ratings=temp[temp['date'].apply(lambda x: x<date)]
if avail_ratings.shape[0]>0:
final=avail_ratings[avail_ratings['date']==max(avail_ratings['date'].values)]
return final.index[0]
else:
return np.nan
else:
return np.nan
holdings['ratings_match']=pd.Series(get_rating_index(holdings['cusip'][i],holdings['date'][i],ratings) for i in holdings.index)

这篇文章对于我需要应用于相同数据的更简单函数很有帮助,但是当我尝试将其用于此任务时,我收到错误:"数组块大小未知:%s',(nan,),'发生在索引 0"。

这是我与 dask 一起使用的确切代码(对我与 pandas 一起使用的函数略有修改):

def get_rating_index(row):
if row['cusip'] in ratings['cusip'].values:
temp=ratings[ratings['cusip']==row['cusip']]
avail_ratings=temp[temp['date'].apply(lambda x: x<row['date'])]
if avail_ratings.shape[0]>0:
final=avail_ratings[avail_ratings['date']==max(avail_ratings['date'].values)]
return final.index[0]
else:
return -1
else:
return -1
holdings['ratings_match'] = holdings.apply(get_rating_index,meta='int', axis=1)

知道如何在不出现此错误的情况下执行此操作的任何想法?我应该注意,当我尝试查看数据(例如,使用holdings.head())时,错误会发生一次,而不是在创建列时立即发生。

这还不是一个完整的答案,但可能会让你开始:

holdings.apply(get_rating_index, meta='int', axis=1)

极有可能真的想在这里mapmap_partition。它们允许您以更直接的方式概括熊猫,并且通常更有效率。

事实上,您正在运行的功能get_rating_index感觉很像单个地图或操作位置,尽管很难说出意图是什么。另一方面,temp=ratings[ratings['cusip']==row['cusip']]看起来像是按分组操作。

最新更新