我已经使用熊猫在两个数据帧之间实现了一个模糊字符串匹配算法。我的问题是如何将其转换为使用多个内核的 dask 操作?我的程序在纯python上运行大约3-4天,我想并行化操作以优化时间成本。我已经使用多处理包使用以下代码提取内核数:
numCores = multiprocessing.cpu_count()
fields = ['id','phase','new']
emb = pd.read_csv('my_csv.csv', skipinitialspace=True, usecols=fields)
然后,我必须根据每个字符串关联的数值将数据帧 emb 细分为两个数据帧(emb1、emb2(。就像我通过匹配的字符串将一个数据帧与值为 3 的所有元素与另一个数据帧中的相应值 2 匹配一样。纯熊猫操作的代码如下。
emb1 = emb[emb.phase.isin([3.0])]
emb1.set_index('id',inplace=True)
emb2 = emb[emb.phase.isin([2.0,1.5])]
emb2.set_index('id',inplace=True)
def fuzzy_match(x, choices, scorer, cutoff):
return process.extractOne(x, choices=choices, scorer=scorer, score_cutoff=cutoff)
FuzzyWuzzyResults = pd.DataFrame(emb1.sort_index().loc[:,'strings'].apply(fuzzy_match, args = (emb2.loc[:,'strings'],fuzz.ratio,90)))
我尝试使用以下代码进行 dask 实现:
emb1 = dd.from_pandas(emb1, npartitions=numCores)
emb2 = dd.from_pandas(emb2, npartitions=numCores)
但是为两个数据帧运行 lambda 函数让我感到困惑。有什么想法吗?
所以我只是修复了我的代码以删除数据帧的手动分区并改用 groupby。
代码如下:
for i in [2.0,1.5]:
FuzzyWuzzyResults = emb.map_partitions(lambda df: df.groupby('phase').get_group(3.0)['drugs'].apply(fuzzy_match, args=(df.groupby('phase').get_group(i)['drugs'],fuzz.ratio,90)), meta=('results')).compute()
不确定它是否准确,但至少它正在运行,并且在所有 CPU 内核上也是如此。