pandasql与多处理时悬挂



使用pandasql与多处理结束时挂了一些时间(即同时使用许多过程时(。

匹配函数在一组输入数据(不使用多处理(上正常工作。多处理代码可以与其他功能一起工作。当匹配函数用于多处理时,一个或多个过程往往会悬挂。我还尝试将多处理上下文更改为" Spawn",以避免可能从父过程中复制锁,但这无济于事。

def matching(inputData):
    q = """
        SELECT
            df1.time,
            df2.time,
            df1.lat,
            df2.lat,
            df1.lng,
            df2.lng,
            (JulianDay(df2.time) - JulianDay(df1.time)) * 24 * 60  as timeDiff
        FROM 
            df1
        LEFT JOIN
            df2
        WHERE
            timeDiff < 5 AND 
            timeDiff >= -2 AND
            ABS(df1.lat - df2.lat) < .02 AND
            ABS(df1.lng - df2.lng) < .02;
    """
    currentDate, df1, df2 = inputData
    result = pandasql.sqldf(q, locals())
    return result
pool = multiprocessing.Pool(multiprocessing.cpu_count())
df = pandas.concat(pool.map(matching, matchingData))
pool.close()
pool.join()

输入数据MatchingData是包含数据集对的元组列表。每个数据集对对应于单个日期。匹配代码应一次将文件对划分为16个进程。结果应该是包含所有日期的合并数据的熊猫数据框。

这个问题似乎是在每个过程中使用sqlite的sqlalchemy软件包源于pandasql软件包。为了模仿Pandasql使用SQLalchemy软件包正在做的事情,我编写了以下代码。

import multiprocessing as mp
import sqlalchemy
def testing(iteration):
    print("Started for iteration {}".format(iteration))
    engine = sqlalchemy.create_engine('sqlite://', 
                                  echo = True)
    print("Created engine for iteration {}".format(iteration))
    with engine.connect() as conn:
        print("Established database connection for iteration {}".format(iteration))
pool = mp.Pool(2)
results = pool.map(testing, range(2))
pool.close()
pool.join()

sqlalchemy成功地创建了这两个引擎,但仅为两个引擎之一创建了连接。可以通过使用MySQL而不是SQLITE使用SQLalchemy解决问题。因为pandasql使用sqlite,因此不应将其用于多处理。

相关内容

  • 没有找到相关文章

最新更新