我正在使用Pandas数据帧处理100,000行文本数据。每隔一段时间(每 100,000 <5 个(,我就会有一个错误,用于我选择删除的一行。错误处理功能如下:
def unicodeHandle(datai):
for i, row in enumerate(datai['LDTEXT']):
print(i)
#print(text)
try:
text = row.read()
text.strip().split('[W_]+')
print(text)
except UnicodeDecodeError as e:
datai.drop(i, inplace=True)
print('Error at index {}: {!r}'.format(i, row))
print(e)
return datai
该功能工作正常,我已经使用了几周。
问题是我永远不知道何时会发生错误,因为数据来自不断添加到的数据库(或者我可能会提取不同的数据(。关键是,我必须遍历每一行以运行我的错误测试函数unicodeHandle
以便初始化我的数据。这个过程大约需要~5分钟,这有点烦人。我正在尝试实现多处理以加快循环。通过网络和各种教程,我想出了:
def unicodeMP(datai):
chunks = [datai[i::8] for i in range(8)]
pool = mp.Pool(processes=8)
results = pool.apply_async(unicodeHandle, chunks)
while not results.ready():
print("One Sec")
return results.get()
if __name__ == "__main__":
fast = unicodeMP(datai)
当我运行它进行多处理时,即使通过我的 CPU 说它以更高的利用率运行,它也花费与常规相同的时间。此外,代码将错误作为正常错误返回,而不是我完成的干净数据帧。我在这里错过了什么?
如何对数据帧上的函数使用多处理?
您可以尝试使用 dask 对数据帧进行多处理
import dask.dataframe as dd
partitions = 7 # cpu_cores - 1
ddf = dd.from_pandas(df, npartitions=partitions)
ddf.map_partitions(lambda df: df.apply(unicodeHandle).compute(scheduler='processes')
您可以在此处阅读有关dask
的更多信息