我使用 pymongo
从mongodb
中一次性提取 .8
百万条记录(这是一次性过程(并对其执行一些操作。
我的代码如下所示。
proc = []
for rec in cursor: # cursor has .8 million rows
print cnt
cnt = cnt + 1
url = rec['urlk']
mkptid = rec['mkptid']
cii = rec['cii']
#self.process_single_layer(url, mkptid, cii)
proc = Process(target=self.process_single_layer, args=(url, mkptid, cii))
procs.append(proc)
proc.start()
# complete the processes
for proc in procs:
proc.join()
process_single_layer
是一个基本上是从云下载urls
并存储在本地的功能。
现在的问题是下载过程很慢,因为它必须点击一个网址。由于处理 1k 行的记录很大,因此需要 6 分钟。
为了减少我想实施Multiprocessing
的时间。但是很难看出与上述代码有任何区别。
请建议我如何提高在这种情况下的性能。
您需要计算文件中的所有行,然后生成固定数量的进程(理想情况下与处理器内核的数量相匹配(,通过队列(每个进程一个(向这些进程提供等于除法total_number_of_rows / number_of_cores
的行数。此方法背后的想法是,您将这些行的处理拆分到多个进程之间,从而实现并行性。
动态找出内核数的一种方法是执行以下操作:
import multiprocessing as mp
cores_count = mp.cpu_count()
通过避免初始行计数可以完成的轻微改进是通过创建队列列表循环添加行,然后对其应用循环迭代器。
一个完整的例子:
import queue
import multiprocessing as mp
import itertools as itools
cores_count = mp.cpu_count()
def dosomething(q):
while True:
try:
row = q.get(timeout=5)
except queue.Empty:
break
# ..do some processing here with the row
pass
if __name__ == '__main__':
processes
queues = []
# spawn the processes
for i in range(cores_count):
q = mp.Queue()
queues.append(q)
proc = Process(target=dosomething, args=(q,))
processes.append(proc)
queues_cycle = itools.cycle(queues)
for row in cursor:
q = next(queues_cycle)
q.put(row)
# do the join after spawning all the processes
for p in processes:
p.join()
在这种情况下,使用池更容易。
队列不是必需的,因为您不需要在生成的进程之间进行通信。我们可以使用Pool.map
来分配工作负载。
Pool.imap
或Pool.imap_unordered
块大小越大可能会更快。(参考: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap(如果需要,您可以使用Pool.starmap
并摆脱元组解包。
from multiprocessing import Pool
def process_single_layer(data):
# unpack the tuple and do the processing
url, mkptid, cii = data
return "downloaded" + url
def get_urls():
# replace this code: iterate over cursor and yield necessary data as a tuple
for rec in range(8):
url = "url:" + str(rec)
mkptid = "mkptid:" + str(rec)
cii = "cii:" + str(rec)
yield (url, mkptid, cii)
# you can come up with suitable process count based on the number of CPUs.
with Pool(processes=4) as pool:
print(pool.map(process_single_layer, get_urls()))