当我在 mp 库启动的每个处理器中的分块 Pandas 数据帧上使用多处理时,我收到一个表未找到错误。
我通过以下方式使用pandasql库进行SQL:
import pandasql import sqldf
pysqldf = lambda q: sqldf(q, globals())
df = pd.DataFrame({'a': [1,2,4,3,6,1,2], 'b': ['a','a','b','b','c','c','c']})
这适用于单线程:
sorted_df = pysqldf("select * from df order by b, a")
当我应用多处理来处理每个 df 块时,它不起作用 平行:
def parallelize_dataframe(df, func):
unique_bs = df.b.unique().tolist()
df_split = [df[df.a == l] for l in unique_bs]
df = pd.concat(pool.map(func, df_split))
pool = Pool(num_cores)
pool.close()
pool.join()
return df
def sort_chunks(data):
sorted_data = pysqldf("select * from data order by b, a")
return sorted_data
sorted_df = parallelize_dataframe(df, sort_chunks)
我得到的错误如下:
PandaSQLException: (sqlite3.操作错误(没有这样的表:数据 [SQL:"从数据中选择 *"](此错误的背景信息见: http://sqlalche.me/e/e3q8(
我明白错误告诉我什么。基本上,每个处理器中的数据 DF 在数据库中都找不到。我不确定此问题的解决方法是什么。任何意见将不胜感激。谢谢。
我想通了。问题是命名空间。本质上,pysqldf 接受两个参数 - 查询和命名空间,它们是全局的或局部的。如果 sqldf 函数包装在函数中,并且需要使用该函数的本地数据帧,则命名空间参数应为局部参数。
将排序数据功能修改为以下内容解决了该问题:
def sort_chunks(data):
sorted_data = pysqldf("select * from data order by b, a", locals())
return sorted_data