我将这个问题分为两部分。
我有与此类似的代码
for data in data_list:
rslt = query in databae where data == 10 # just some pseudo database query to give example but this single query usually takes around 30-50seconds.
if rslt.property == 'some condition here':
return rslt
这里的条件是
- 我们必须在查询后返回与条件匹配的
data_list
的第一个元素。 - 每个元素的每个数据库查询大约需要 30-40 秒。
data_list
通常非常大,大约 15-20k 元素- 不幸的是,我们不能对整个
data_list
进行单个数据库查询。我们必须循环或一次一个元素来执行此操作。
现在我的问题是,
- 如何优化此过程。目前,整个过程大约需要3-4小时。
- 我读过关于python线程和多处理的信息,但我对在这种情况下哪一个合适感到困惑。
您可以考虑使用多处理Pool
。然后,您可以使用map
将可迭代的块发送给Pool
的工作线程,以便根据给定的函数进行处理。因此,假设您的查询是一个函数,例如query(data)
:
def query(data):
rslt = query in databae where data == 10
if rslt.property == 'some condition here':
return rslt
我们将像这样使用池:
from multiprocessing import Pool
with Pool() as pool:
results = pool.map(query, data_list)
现在,根据您的要求,我们将找到第一个:
print(next(filter(None, results)))
请注意,以这种方式query
函数意味着results
将是rslt
s 和None
s 的列表,我们正在寻找第一个非None
结果。
一些注意事项:
- 请注意,
Pool
的构造者的第一个参数是processes
它允许您选择池将容纳多少个进程:如果进程
None
则使用 os.cpu_count() 返回的数字。 - 请注意,
map
具有默认为 1 的chunksize
参数,并允许选择传递给 worker 的块的大小:此方法将可迭代对象切成多个块,并将其作为单独的任务提交到进程池。可以通过将块大小设置为正整数来指定这些块的(近似)大小。
继续
map
,文档建议对具有特定块的大型迭代对象使用imap
以提高效率:请注意,这可能会导致非常长的可迭代对象内存使用率过高。考虑使用带有显式块大小选项的 imap() 或 imap_unordered() 以提高效率。
从
imap
文档中:chunksize参数与 map() 方法使用的参数相同。对于非常长的可迭代对象,使用较大的块大小值可以使作业完成速度比使用默认值
1
快得多。因此,我们实际上可以提高效率并执行以下操作:
chunksize = 100 processes = 10 with Pool(processes=processes) as pool: print(next(filter(None, pool.imap(query, data_list, chunksize=chunksize))))
在这里,您可以使用
chunksize
甚至processes
(从Pool
回来)玩,看看哪种组合会产生最佳效果。如果您有兴趣,只需将 import 语句更改为:
from multiprocessing.dummy import Pool
正如文档所说:
multiprocessing.dummy 复制了多处理的 API,但只不过是线程模块的包装器。
希望这以任何方式有所帮助