这个问题听起来可能很基本,因为我对多处理不太了解,我只是在学习。
我有python代码,它处理一个目录中的一堆文件。
with Pool(processes=cores) as pp:
pp.map(function, list)
这是我的代码:
path = '/data/personal'
print("Running with PID: %d" % getpid())
psl = PublicSuffixList()
d = defaultdict(set)
start = time()
#
files_list = glob(path)
for filename in files:
print(filename)
f = open(filename, 'r')
for n, line in enumerate(f):
line = line[:-1]
ip,reversed_domain_1= line.split('|')
reversed_domain_2 = reversed_domain_1.split('.')
reversed_domain_3 = list(reversed(reversed_domain_2))
domain = ('.'.join(reversed_domain_3))
domain = psl.get_public_suffix(domain)
d[ip].add(domain)
###
for ip, domains in d.iteritems():
for domain in domains:
print(ip,domain)
如何将其转换为在多处理池中完成?
您可以在一个单独的过程中处理每个文件,如下所示:
from os import getpid
from collections import defaultdict
from glob import glob
from multiprocessing import Pool
from time import time
from functools import partial
path = '/data/personal'
print("Running with PID: %d" % getpid())
def process_file(psl, filename):
print(filename)
f = open(filename, 'r')
for n, line in enumerate(f):
line = line[:-1]
ip,reversed_domain_1= line.split('|')
reversed_domain_2 = reversed_domain_1.split('.')
reversed_domain_3 = list(reversed(reversed_domain_2))
domain = ('.'.join(reversed_domain_3))
domain = psl.get_public_suffix(domain)
return ip, domain
if __name__ == "__main__":
psl = PublicSuffixList()
d = defaultdict(set)
start = time()
files_list = glob(path)
pp = Pool(processes=cores)
func = partial(process_file, psl)
results = pp.imap_unordered(func, files_list)
for ip, domain in results:
d[ip].add(domain)
p.close()
p.join()
for ip, domains in d.iteritems():
for domain in domains:
print(ip,domain)
请注意,defaultdict
是在父进程中填充的,因为如果不使用multiprocessing.Manager
,就无法在多个进程之间共享同一个defaultdict
。如果你愿意,你可以在这里做,但我认为没有必要。相反,只要任何子级有可用的结果,我们就会将其添加到父级中的defaultdict
中。使用imap_unordered
而不是map
使我们能够按需接收结果,而不必等待所有结果都准备好。唯一值得注意的是,使用partial
可以将psl
列表传递给所有子进程,此外还可以使用imap_unordered
将files_list
中的一个项目传递给其他子进程。
这里有一个重要的注意事项:使用multiprocessing
进行这种操作实际上可能不会提高性能。您在这里所做的很多工作都是从磁盘读取,这是无法通过多个进程加快的;您的硬盘驱动器一次只能执行一次读取操作。同时从一堆进程中获得对不同文件的读取请求实际上会减慢按顺序执行的速度,因为它可能需要不断切换到物理磁盘的不同区域才能从每个文件中读取一行新行。现在,您对每一行所做的与CPU相关的工作可能足够昂贵,足以支配I/O时间,在这种情况下,您将看到速度的提高。