在花了很多时间试图了解多处理之后,我想出了以下代码,这是一个基准测试:
示例1:
from multiprocessing import Process
class Alter(Process):
def __init__(self, word):
Process.__init__(self)
self.word = word
self.word2 = ''
def run(self):
# Alter string + test processing speed
for i in range(80000):
self.word2 = self.word2 + self.word
if __name__=='__main__':
# Send a string to be altered
thread1 = Alter('foo')
thread2 = Alter('bar')
thread1.start()
thread2.start()
# wait for both to finish
thread1.join()
thread2.join()
print(thread1.word2)
print(thread2.word2)
这将在2秒内完成(多线程时间的一半)。出于好奇,我决定运行下一个:
示例2:
word2 = 'foo'
word3 = 'bar'
word = 'foo'
for i in range(80000):
word2 = word2 + word
word = 'bar'
for i in range(80000):
word3 = word3 + word
print(word2)
print(word3)
让我感到恐怖的是,这只持续了不到半秒!
这是怎么回事?我希望多处理运行得更快——考虑到示例1是示例2分为两个进程的,它不应该在示例2的一半时间内完成吗?
更新:
在考虑了Chris的反馈后,我包含了消耗最多处理时间的"实际"代码,并引导我考虑多处理:
self.ListVar = [[13379+ strings],[13379+ strings],
[13379+ strings],[13379+ strings]]
for b in range(len(self.ListVar)):
self.list1 = []
self.temp = []
for n in range(len(self.ListVar[b])):
if not self.ListVar[b][n] in self.temp:
self.list1.insert(n, self.ListVar[b][n] + '(' +
str(self.ListVar[b].count(self.ListVar[b][n])) +
')')
self.temp.insert(0, self.ListVar[b][n])
self.ListVar[b] = list(self.list1)
多处理可能对你正在做的事情很有用,但不是你想使用它的方式。因为你基本上是在对列表的每个成员进行一些计算,所以你可以使用multiprocessing.Pool.map
方法,并行地对列表成员进行计算。
下面的示例显示了使用单个进程和multiprocessing.Pool.map
:的代码性能
from multiprocessing import Pool
from random import choice
from string import printable
from time import time
def build_test_list():
# Builds a test list consisting of 5 sublists of 10000 strings each.
# each string is 20 characters long
testlist = [[], [], [], [], []]
for sublist in testlist:
for _ in xrange(10000):
sublist.append(''.join(choice(printable) for _ in xrange(20)))
return testlist
def process_list(l):
# the time-consuming code
result = []
tmp = []
for n in range(len(l)):
if l[n] not in tmp:
result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
tmp.insert(0, l[n])
return result
def single(l):
# process the test list elements using a single process
results = []
for sublist in l:
results.append(process_list(sublist))
return results
def multi(l):
# process the test list elements in parallel
pool = Pool()
results = pool.map(process_list, l)
return results
print "Building the test list..."
testlist = build_test_list()
print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime
print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime
# make sure they both return the same thing
assert singleresults == multiresults
print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)
输出:
Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec
ETA:既然您已经发布了代码,我可以告诉您有一种简单的方法可以更快地完成您正在做的事情(快100倍)。
我看到您正在做的是在字符串列表中的每个项上添加一个括号中的频率。您可以创建一个字典,将每个元素映射到其频率,而不是每次计算所有元素(正如您可以使用cProfile确认的那样,这是迄今为止代码中最大的瓶颈)。这样,您只需要浏览列表两次——一次创建频率字典,一次使用它添加频率。
在这里,我将展示我的新方法,计时,并使用生成的测试用例将其与旧方法进行比较。测试用例甚至显示新结果与旧结果完全相同注意:下面您真正需要注意的是new_method。
import random
import time
import collections
import cProfile
LIST_LEN = 14000
def timefunc(f):
t = time.time()
f()
return time.time() - t
def random_string(length=3):
"""Return a random string of given length"""
return "".join([chr(random.randint(65, 90)) for i in range(length)])
class Profiler:
def __init__(self):
self.original = [[random_string() for i in range(LIST_LEN)]
for j in range(4)]
def old_method(self):
self.ListVar = self.original[:]
for b in range(len(self.ListVar)):
self.list1 = []
self.temp = []
for n in range(len(self.ListVar[b])):
if not self.ListVar[b][n] in self.temp:
self.list1.insert(n, self.ListVar[b][n] + '(' + str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
self.temp.insert(0, self.ListVar[b][n])
self.ListVar[b] = list(self.list1)
return self.ListVar
def new_method(self):
self.ListVar = self.original[:]
for i, inner_lst in enumerate(self.ListVar):
freq_dict = collections.defaultdict(int)
# create frequency dictionary
for e in inner_lst:
freq_dict[e] += 1
temp = set()
ret = []
for e in inner_lst:
if e not in temp:
ret.append(e + '(' + str(freq_dict[e]) + ')')
temp.add(e)
self.ListVar[i] = ret
return self.ListVar
def time_and_confirm(self):
"""
Time the old and new methods, and confirm they return the same value
"""
time_a = time.time()
l1 = self.old_method()
time_b = time.time()
l2 = self.new_method()
time_c = time.time()
# confirm that the two are the same
assert l1 == l2, "The old and new methods don't return the same value"
return time_b - time_a, time_c - time_b
p = Profiler()
print p.time_and_confirm()
当我运行这个程序时,它的次数为(15.9638121128082282005961179733276367),这意味着它的速度大约快了250倍,尽管这个优势取决于列表的长度和每个列表中的频率分布。我相信你会同意,有了这种速度优势,你可能不需要使用多处理:)
(我的原始答案在下面留给后人)
ETA:顺便说一句,值得注意的是,这个算法在列表的长度上大致是线性的,而你使用的代码是二次的。这意味着元件的数量越大,它的性能就越有优势。例如,如果将每个列表的长度增加到1000000,则运行只需5秒。根据推断,旧代码需要一天的时间:)
这取决于您正在执行的操作。例如:
import time
NUM_RANGE = 100000000
from multiprocessing import Process
def timefunc(f):
t = time.time()
f()
return time.time() - t
def multi():
class MultiProcess(Process):
def __init__(self):
Process.__init__(self)
def run(self):
# Alter string + test processing speed
for i in xrange(NUM_RANGE):
a = 20 * 20
thread1 = MultiProcess()
thread2 = MultiProcess()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
def single():
for i in xrange(NUM_RANGE):
a = 20 * 20
for i in xrange(NUM_RANGE):
a = 20 * 20
print timefunc(multi) / timefunc(single)
在我的机器上,多进程操作只占用单线程操作的60%左右的时间。
此示例太小,无法从多处理中获益。
启动一个新流程时会有很多开销。如果涉及大量的处理,这将是可以忽略的。但你的例子并没有那么密集,所以你一定会注意到开销。
您可能会注意到与实际线程有更大的区别,太糟糕的python(好吧,CPython)在CPU绑定线程方面存在问题。
这个线程非常有用!
只是对上面David Robinson提供的第二个代码的快速观察(12年1月8日5:34回答),这是更适合我当前需求的代码。
在我的例子中,我以前有一个目标函数在没有多处理的情况下运行时间的记录。当使用他的代码来实现多处理函数时,他的timefunc(multi)并没有反映multi的实际时间,而是似乎反映了在父级中花费的时间。
我所做的是将计时函数外部化,我得到的时间看起来更像预期:
start = timefunc()
multi()/single()
elapsed = (timefunc()-start)/(--number of workers--)
print(elapsed)
在我使用双核的情况下,"x"工作者使用目标函数执行的总时间是用"x"迭代在目标函数上运行简单for循环的两倍。
我是多处理的新手,所以请对这一观察保持谨慎。