我正在努力学习如何实现计算蒙特卡罗模拟的多处理。我复制了这个简单教程中的代码,目的是计算积分。我还将其与WolframAlpha的答案进行了比较,并计算了误差。我的代码的第一部分没有问题,只是用来定义积分函数和声明一些常量:
import numpy as np
import multiprocessing as mp
import time
def integrate(iterations):
np.random.seed()
mc_sum = 0
chunks = 10000
chunk_size = int(iterations/chunks)
for i in range(chunks):
u = np.random.uniform(size=chunk_size)
mc_sum += np.sum(np.exp(-u * u))
normed = mc_sum / iterations
return normed
wolfram_answer = 0.746824132812427
mc_iterations = 1000000000
但在接下来的两部分中会发生一些非常诡异的事情(我给它们贴上了标签,因为这很重要(。首先(标记为"BLOCK 1"(,我在没有任何多处理的情况下进行模拟,只是为了获得一个基准。在这之后(标记为"BLOCK 2"(,我做了同样的事情,但有一个多处理步骤。如果你正在复制这个,你可能需要根据你的机器有多少核心来调整num_procs
变量:
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
输出为:
1000000000 iterations on single-thread: 37.448 seconds.
Estimation error: 1.17978774235e-05
8 threads with 125000000 iterations each: 54.697 seconds.
Estimation error: -5.88380936901e-06
因此,多处理速度较慢。这并非闻所未闻;也许多处理带来的开销比并行化带来的收益还要多?
但是,事实并非如此观察当我只是注释掉第一个块时会发生什么:
#### BLOCK 1
##single_before = time.time()
##single = integrate(mc_iterations)
##single_after = time.time()
##single_duration = np.round(single_after - single_before, 3)
##error_single = (wolfram_answer - single)/wolfram_answer
##
##print(mc_iterations, "iterations on single-thread:",
## single_duration, "seconds.")
##print("Estimation error:", error_single)
##print("")
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
输出为:
8 threads with 125000000 iterations each: 6.662 seconds.
Estimation error: 3.86063069069e-06
没错,完成多处理的时间从55秒下降到了不到7秒!这甚至不是最奇怪的部分。观察当我将区块1移动到区块2之后时会发生什么:
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
输出为:
8 threads with 125000000 iterations each: 54.938 seconds.
Estimation error: 7.42415402896e-06
1000000000 iterations on single-thread: 37.396 seconds.
Estimation error: 9.79800494235e-06
我们又回到了慢速输出,这简直太疯狂了难道不应该解释Python吗?我知道这句话有一百条警告,但我想当然地认为代码是逐行执行的,所以后面的东西(函数、类等之外(不会影响以前的东西,因为它还没有被"查看"过。
那么,在多处理步骤结束后执行的东西如何才能追溯地减慢多处理代码的速度呢?
最后,只需将块1缩进到if __name__ == "__main__"
块内,即可恢复快速行为,因为它当然会:
#### BLOCK 2
if __name__ == "__main__":
num_procs = 8
multi_iterations = int(mc_iterations / num_procs)
multi_before = time.time()
pool = mp.Pool(processes = num_procs)
multi_result = pool.map(integrate, [multi_iterations]*num_procs)
multi_result = np.array(multi_result).mean()
multi_after = time.time()
multi_duration = np.round(multi_after - multi_before, 3)
error_multi = (wolfram_answer - multi_result)/wolfram_answer
print(num_procs, "threads with", multi_iterations, "iterations each:",
multi_duration, "seconds.")
print("Estimation error:", error_multi)
#### BLOCK 1
single_before = time.time()
single = integrate(mc_iterations)
single_after = time.time()
single_duration = np.round(single_after - single_before, 3)
error_single = (wolfram_answer - single)/wolfram_answer
print(mc_iterations, "iterations on single-thread:",
single_duration, "seconds.")
print("Estimation error:", error_single)
print("")
输出为:
8 threads with 125000000 iterations each: 7.293 seconds.
Estimation error: 1.10350027622e-05
1000000000 iterations on single-thread: 31.035 seconds.
Estimation error: 2.53582945763e-05
如果将块1保留在if
块内,但将其移动到定义num_procs
的上方(此处未显示,因为这个问题已经很长了(,那么快速行为也会恢复。
那么,究竟是什么导致了这种行为呢我猜这是线程和进程分支的某种竞争条件,但从我的专业水平来看,我的Python解释器可能会闹鬼。
这是因为您使用的是Windows。在Windows上,每个子流程都是使用'spawn'
方法生成的,该方法本质上启动一个新的python解释器并导入您的模块,而不是分叉流程。
这是一个问题,因为if __name__ == '__main__'
之外的所有代码都被再次执行。如果将多处理代码放在顶层,这可能会导致多处理炸弹,因为它会开始生成进程,直到内存耗尽。
这实际上是文档中警告的
主模块的安全导入
确保新的Python可以安全地导入主模块解释器,而不会造成意外的副作用(例如启动新工艺(。
相反,应该使用
if __name__ == '__main__'
这允许新生成的Python解释器安全地导入单元
在Python 2的旧文档中,该部分曾被称为"Windows"。
添加一些细节,在Windows上,模块在每个工作进程中都是"从头开始"导入的。这意味着模块中的所有都由每个工作者执行。因此,在您的第一个示例中,每个工作进程首先执行"BLOCK 1"。
但你的产出并没有反映出这一点。你应该得到一行像一样的输出
1000000000次单线程迭代:37.448秒。
来自您的8个工作进程中的每一个。但你的输出并没有显示出这一点。也许您使用的IDE会抑制派生进程的输出?如果您在"DOS框"(cmd.exe
窗口(中运行它,则不会抑制输出,并且可以使发生的事情更加清晰。