使用pandasql与多处理结束时挂了一些时间(即同时使用许多过程时(。
匹配函数在一组输入数据(不使用多处理(上正常工作。多处理代码可以与其他功能一起工作。当匹配函数用于多处理时,一个或多个过程往往会悬挂。我还尝试将多处理上下文更改为" Spawn",以避免可能从父过程中复制锁,但这无济于事。
def matching(inputData):
q = """
SELECT
df1.time,
df2.time,
df1.lat,
df2.lat,
df1.lng,
df2.lng,
(JulianDay(df2.time) - JulianDay(df1.time)) * 24 * 60 as timeDiff
FROM
df1
LEFT JOIN
df2
WHERE
timeDiff < 5 AND
timeDiff >= -2 AND
ABS(df1.lat - df2.lat) < .02 AND
ABS(df1.lng - df2.lng) < .02;
"""
currentDate, df1, df2 = inputData
result = pandasql.sqldf(q, locals())
return result
pool = multiprocessing.Pool(multiprocessing.cpu_count())
df = pandas.concat(pool.map(matching, matchingData))
pool.close()
pool.join()
输入数据MatchingData是包含数据集对的元组列表。每个数据集对对应于单个日期。匹配代码应一次将文件对划分为16个进程。结果应该是包含所有日期的合并数据的熊猫数据框。
这个问题似乎是在每个过程中使用sqlite的sqlalchemy软件包源于pandasql软件包。为了模仿Pandasql使用SQLalchemy软件包正在做的事情,我编写了以下代码。
import multiprocessing as mp
import sqlalchemy
def testing(iteration):
print("Started for iteration {}".format(iteration))
engine = sqlalchemy.create_engine('sqlite://',
echo = True)
print("Created engine for iteration {}".format(iteration))
with engine.connect() as conn:
print("Established database connection for iteration {}".format(iteration))
pool = mp.Pool(2)
results = pool.map(testing, range(2))
pool.close()
pool.join()
sqlalchemy成功地创建了这两个引擎,但仅为两个引擎之一创建了连接。可以通过使用MySQL而不是SQLITE使用SQLalchemy解决问题。因为pandasql使用sqlite,因此不应将其用于多处理。