我需要加快在块中读取大型CSV文件的Python脚本的执行,进行一些处理,然后将处理的行保存到数据库中。处理10,000行然后将它们持续使用花费比较的时间(1.5秒(。当然,时代确实有一些波动,有时处理更快,有时会持续。
不幸的是,由于处理是历史的(记录是股票交易,并且根据先前的活动进行计算(,因此处理记录无法轻易平行。有可能,但是对于这个问题,可以做的事情是并行化处理块的情况,并持续存在以前的块的结果。这应该使总时间减半。
for chunk in pd.read_csv(filename, chunksize=chunksize):
# the following two tasks in parallel
persist (rows_from_previous_chunk) # this is I/O waiting, mostly
rows_to_save = process(chunk) # this is Python, not C
# wait for the above to finish
rows_from_previous_chunk = rows_to_save
我的问题是要执行上述建议的方法。我可以想到一些:
鉴于一项任务主要是I/O等待,我有机会在不遇到GIL争夺的情况下使用多线程。
第二个选择是使用dask,特别延迟。但是,鉴于每个任务(2s以下(使用的短时间,我不确定这是最好的方法。
第三个选项是读取一个过程&然后对行进行处理,然后将它们通过有限的队列发送到一个将保存到DB的单独的队列。使用JMS队列是过分的,我正在考虑
multiprocessing.Queue()
任何建议将不胜感激。我是一个长期的Java程序员
dask的确会添加开销,但是与典型的2S任务时间相比,很小。为了保持订单,您可以使每个任务取决于上一个任务。这是一个刺的
@dask.delayed
def process_save(rows_from_previous_chunk, chunk):
if rows_from_previous_chunk:
persist(rows_from_previous_chunk)
return process(chunk)
parts = dd.read_csv(filename, chunksize=chunksize).to_delayed()
prev = None
for chunk in parts:
prev = process_save(prev, chunk)
out = dask.delayed(persist)(prev)
dask.compute(out)
out.visualize() # should look interesting
这可能取决于您的数据库,但是如果存在最简单的方法,则可能是使用AiomySQL或Asyncpgto(例如Asyncpgto(的异步库,允许您在后台执行插入查询。
I/O绑定部分可以执行而无需GIL锁,因此您的Python部分将能够继续。
我最终以以下方法。有趣的是,使用多线程无法正常工作;将数据框传递到另一个队列以保存的是仍在阻止主线程继续工作。不是100%确定发生了什么,而是为了时间的利益,我切换到使用过程并有效。该代码是简化的,以便在下面的清晰度上进行清晰,实际上我使用了多个DB工作过程。
import multiprocessing
# this function will run into a separate process, saving the df asynchronously
def save(queue):
db_engine = create_engine(...)
while True:
df = queue.get()
if df is None:
break
df.to_sql(schema="...", name="...", con=db_engine, if_exists="append", chunksize=1000, index=False)
queue.task_done()
if __name__ == '__main__':
queue = multiprocessing.JoinableQueue(maxsize=2)
worker = multiprocessing.Process(name="db_worker", target=save, args=(queue,))
worker.daemon = True
workers.start()
# inside the main loop to process the df
queue.put(df_to_save)
# at the end
worker.join() # wait for the last save job to finish before terminating the main process