Python中的异步持久性



我需要加快在块中读取大型CSV文件的Python脚本的执行,进行一些处理,然后将处理的行保存到数据库中。处理10,000行然后将它们持续使用花费比较的时间(1.5秒(。当然,时代确实有一些波动,有时处理更快,有时会持续。

不幸的是,由于处理是历史的(记录是股票交易,并且根据先前的活动进行计算(,因此处理记录无法轻易平行。有可能,但是对于这个问题,可以做的事情是并行化处理块的情况,并持续存在以前的块的结果。这应该使总时间减半。

for chunk in pd.read_csv(filename, chunksize=chunksize):
    # the following two tasks in parallel
    persist (rows_from_previous_chunk) # this is I/O waiting, mostly
    rows_to_save = process(chunk)      # this is Python, not C
    # wait for the above to finish
    rows_from_previous_chunk = rows_to_save

我的问题是要执行上述建议的方法。我可以想到一些:

  1. 鉴于一项任务主要是I/O等待,我有机会在不遇到GIL争夺的情况下使用多线程。

  2. 第二个选择是使用dask,特别延迟。但是,鉴于每个任务(2s以下(使用的短时间,我不确定这是最好的方法。

  3. 第三个选项是读取一个过程&然后对行进行处理,然后将它们通过有限的队列发送到一个将保存到DB的单独的队列。使用JMS队列是过分的,我正在考虑multiprocessing.Queue()

任何建议将不胜感激。我是一个长期的Java程序员

dask的确会添加开销,但是与典型的2S任务时间相比,很小。为了保持订单,您可以使每个任务取决于上一个任务。这是一个刺的

@dask.delayed
def process_save(rows_from_previous_chunk, chunk):
    if rows_from_previous_chunk:
        persist(rows_from_previous_chunk)
    return process(chunk)
parts = dd.read_csv(filename, chunksize=chunksize).to_delayed()
prev = None
for chunk in parts:
    prev = process_save(prev, chunk)
out = dask.delayed(persist)(prev)
dask.compute(out)
out.visualize()  # should look interesting

这可能取决于您的数据库,但是如果存在最简单的方法,则可能是使用AiomySQL或Asyncpgto(例如Asyncpgto(的异步库,允许您在后台执行插入查询。

I/O绑定部分可以执行而无需GIL锁,因此您的Python部分将能够继续。

我最终以以下方法。有趣的是,使用多线程无法正常工作;将数据框传递到另一个队列以保存的是仍在阻止主线程继续工作。不是100%确定发生了什么,而是为了时间的利益,我切换到使用过程并有效。该代码是简化的,以便在下面的清晰度上进行清晰,实际上我使用了多个DB工作过程。

import multiprocessing
# this function will run into a separate process, saving the df asynchronously
def save(queue):
    db_engine = create_engine(...)
    while True:
        df  = queue.get()
        if df is None:
            break
        df.to_sql(schema="...", name="...", con=db_engine, if_exists="append", chunksize=1000, index=False)
        queue.task_done()
if __name__ == '__main__':
    queue = multiprocessing.JoinableQueue(maxsize=2) 
    worker = multiprocessing.Process(name="db_worker", target=save, args=(queue,))
    worker.daemon = True
    workers.start()
    # inside the main loop to process the df
        queue.put(df_to_save)
    # at the end 
    worker.join()  # wait for the last save job to finish before terminating the main process

相关内容

  • 没有找到相关文章

最新更新