我需要合并两个基于字符串列不完全匹配的大数据集。我有广泛的数据集,可以帮助我比字符串距离更准确地确定最佳匹配,但我首先需要为每个字符串返回几个"顶级匹配"。
可再生的例子:
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
df2_index = [i[2] for i in tup]
scores = [i[1] for i in tup]
return pd.DataFrame({
"df1_index": [idx] * n,
"df2_index": df2_index,
"score": scores
})
import pandas as pd
from fuzzywuzzy import process
s1 = pd.Series(["two apples", "one orange", "my banana", "red grape", "huge kiwi"])
s2 = pd.Series(["a couple of apples", "old orange", "your bananas", "purple grape", "tropical fruit"])
pd.concat([example_function(index, value, s2, 2) for index, value in s1.items()]).reset_index()
我没有成功地并行化这个函数。似乎最接近我要做的是多处理实现,但即使有了星图,我也没有得到结果。我想有一个简单的方法来实现这一点,但还没有找到一个有效的方法。
我愿意接受任何关于如何优化我的代码的建议,但在这种情况下并行处理将是一个合适的解决方案,因为如果顺序完成,它看起来将花费大约4-5个小时(事后看来这是一个慷慨的估计)。
谢谢你的解决方案。df1是7000行df2是70000行。对于下面的结果,我已经搜索了df2中的所有70,000行,以查找df1中的前20行。
- 数据帧连接方法(原始方法):96秒
- 字典链方法:90秒
- 添加并行任务(4个工人):77秒
- 使用rapidfuzz代替fuzzywuzzy: 6.73秒
- 使用rapidfuzz与任务(4个工人):5.29秒
下面是优化后的代码:
from dask.distributed import Client
from dask import delayed
from rapidfuzz import process, fuzz
from itertools import chain
client = Client(n_workers = 4, processes = False)
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, scorer = fuzz.WRatio, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
jobs = [delayed(example_function)(index, value, t3, 20) for index, value in t1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
print(df)
client.close()
并行处理并没有达到我预期的效果。也许这个函数的设置不理想,或者随着我包含更多的迭代,它将继续扩展到更大的影响。无论哪种方式,它都起了作用,所以我在我的个人解决方案中使用了它。谢谢所有的
这并没有回答你的问题,但我很想知道它是否会加快速度。只返回字典而不是dataframe应该更有效:
from itertools import chain
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
data = chain(*(example_function(index, value, s2, 2) for index, value in s1.items()))
df = pd.DataFrame.from_records(data)
print(df)
输出:
idx index2 score
0 0 0 86
1 0 3 43
2 1 1 80
3 1 0 45
4 2 2 76
5 2 1 32
6 3 3 76
7 3 1 53
8 4 0 30
9 4 3 29
以下是你问题的一个答案:
Dask提供了一种非常方便的方法来并行执行Python:
from dask.distributed import Client
from dask import delayed
from itertools import chain
def example_function(idx, string, comparisons, n):
tup = process.extract(string, comparisons, limit = n)
return [{'idx': idx, 'index2': t[2], 'score': t[1]} for t in tup]
client = Client(n_workers=4) # Choose number of cores
jobs = [delayed(example_function)(index, value, s2, 2) for index, value in s1.items()]
data = delayed(jobs)
df = pd.DataFrame.from_records(chain(*data.compute()))
client.close()
在大量数据上尝试一下,看看它是否加快了执行速度(我在这个小样本上尝试过,它并没有更快)。