我试图回答在python中创建线程与创建进程的开销有多大的问题。我修改了一个类似问题的代码,基本上是用两个线程运行一个函数,然后用两个进程运行同一个函数并报告时间。
import time, sys
NUM_RANGE = 100000000
from multiprocessing import Process
import threading
def timefunc(f):
t = time.time()
f()
return time.time() - t
def multiprocess():
class MultiProcess(Process):
def __init__(self):
Process.__init__(self)
def run(self):
# Alter string + test processing speed
for i in xrange(NUM_RANGE):
a = 20 * 20
for _ in xrange(300):
MultiProcess().start()
def multithreading():
class MultiThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
# Alter string + test processing speed
for i in xrange(NUM_RANGE):
a = 20 * 20
for _ in xrange(300):
MultiThread().start()
print "process run time" + str(timefunc(multiprocess))
print "thread run time" + str(timefunc(multithreading))
然后我得到了7.9秒的多处理和7.9秒的多线程
我想回答的主要问题是,在Linux上,对成千上万的网络请求使用多线程或多处理是否合适。根据这段代码,它们在启动时间方面似乎是一样的,但进程的内存使用量可能要大得多?
您的代码不适合对进程和线程之间的启动时间进行基准测试。多线程Python代码(在CPython中)意味着单核。在一个线程中执行的任何Python代码都将在该线程持有全局解释器锁(GIL)时排除该进程中所有其他线程的继续操作。这意味着,只要涉及Python字节码,就只能与线程进行并发,而不能进行真正的并行。
您的示例主要是对特定的CPU绑定工作负载性能进行基准测试(在紧密循环中运行计算),这是您无论如何都不会使用线程的。如果您想衡量创建开销,您必须(尽可能)从基准测试中剥离除创建本身之外的任何内容。
TL;DR
启动一个线程(以Ubuntu18.04为基准)比启动一个进程便宜很多倍。
与线程启动相比,使用指定的start_methods启动进程需要:
- 叉子:长约33倍
- forkserver:延长约6693x
- 产卵:延长约7558x
底部的完整结果。
基准
我最近升级到了Ubuntu 18.04,并测试了一个脚本,希望它更接近事实。请注意,这段代码是Python 3。
格式化和比较测试结果的一些实用程序:
# thread_vs_proc_start_up.py
import sys
import time
import pandas as pd
from threading import Thread
import multiprocessing as mp
from multiprocessing import Process, Pipe
def format_secs(sec, decimals=2) -> str:
"""Format subseconds.
Example:
>>>format_secs(0.000_000_001)
# Out: '1.0 ns'
"""
if sec < 1e-6:
return f"{sec * 1e9:.{decimals}f} ns"
elif sec < 1e-3:
return f"{sec * 1e6:.{decimals}f} µs"
elif sec < 1:
return f"{sec * 1e3:.{decimals}f} ms"
elif sec >= 1:
return f"{sec:.{decimals}f} s"
def compare(value, base):
"""Return x-times relation of value and base."""
return f"{(value / base):.2f}x"
def display_results(executor, result_series):
"""Display results for Executor."""
exe_str = str(executor).split(".")[-1].strip(''>')
print(f"nresults for {exe_str}:n")
print(result_series.describe().to_string(), "n")
print(f"Minimum with {format_secs(result_series.min())}")
print("-" * 60)
基准函数如下。对于n_runs
中的每一个测试,都会创建一个新管道。一个新的进程或线程(一个执行器)启动,目标函数calc_start_up_time
立即返回时间差。仅此而已。
def calc_start_up_time(pipe_in, start):
pipe_in.send(time.perf_counter() - start)
pipe_in.close()
def run(executor, n_runs):
results = []
for _ in range(int(n_runs)):
pipe_out, pipe_in = Pipe(duplex=False)
exe = executor(target=calc_start_up_time, args=(pipe_in,
time.perf_counter(),))
exe.start()
# Note: Measuring only the time for exe.start() returning like:
# start = time.perf_counter()
# exe.start()
# end = time.perf_counter()
# would not include the full time a new process needs to become
# production ready.
results.append(pipe_out.recv())
pipe_out.close()
exe.join()
result_series = pd.Series(results)
display_results(executor, result_series)
return result_series.min()
它的构建是从终端开始的,带有start_method和作为命令行参数传递的运行次数。基准测试将始终使用指定的start_method(在Ubuntu 18.04:fork,spawn,forkserver上可用)运行进程启动的n_runs
,然后与线程启动的n_runs
进行比较。结果集中在最小值上,因为它们显示了可能的速度。
if __name__ == '__main__':
# Usage:
# ------
# Start from terminal with start_method and number of runs as arguments:
# $python thread_vs_proc_start_up.py fork 100
#
# Get all available start methods on your system with:
# >>>import multiprocessing as mp
# >>>mp.get_all_start_methods()
start_method, n_runs = sys.argv[1:]
mp.set_start_method(start_method)
mins = []
for executor in [Process, Thread]:
mins.append(run(executor, n_runs))
print(f"Minimum start-up time for processes takes "
f"{compare(*mins)} "
f"longer than for threads.")
结果
在我生锈的机器上安装n_runs=1000
:
# Ubuntu 18.04 start_method: fork
# ================================
results for Process:
count 1000.000000
mean 0.002081
std 0.000288
min 0.001466
25% 0.001866
50% 0.001973
75% 0.002268
max 0.003365
Minimum with 1.47 ms
------------------------------------------------------------
results for Thread:
count 1000.000000
mean 0.000054
std 0.000013
min 0.000044
25% 0.000047
50% 0.000051
75% 0.000058
max 0.000319
Minimum with 43.89 µs
------------------------------------------------------------
Minimum start-up time for processes takes 33.41x longer than for threads.
# Ubuntu 18.04 start_method: spawn
# ================================
results for Process:
count 1000.000000
mean 0.333502
std 0.008068
min 0.321796
25% 0.328776
50% 0.331763
75% 0.336045
max 0.415568
Minimum with 321.80 ms
------------------------------------------------------------
results for Thread:
count 1000.000000
mean 0.000056
std 0.000016
min 0.000043
25% 0.000046
50% 0.000048
75% 0.000065
max 0.000231
Minimum with 42.58 µs
------------------------------------------------------------
Minimum start-up time for processes takes 7557.80x longer than for threads.
# Ubuntu 18.04 start_method: forkserver
# =====================================
results for Process:
count 1000.000000
mean 0.295011
std 0.007157
min 0.287871
25% 0.291440
50% 0.293263
75% 0.296185
max 0.361581
Minimum with 287.87 ms
------------------------------------------------------------
results for Thread:
count 1000.000000
mean 0.000055
std 0.000014
min 0.000043
25% 0.000045
50% 0.000047
75% 0.000064
max 0.000251
Minimum with 43.01 µs
------------------------------------------------------------
Minimum start-up time for processes takes 6693.44x longer than for threads.
这取决于。。。也许"两者都有"可能是你想要的答案。
python中的多进程使用linux中的标准fork()调用来复制主进程。在最小程序的情况下,可能没有太多数据,但根据最终程序的结构,可以说,它可能是更多的数据。在最小的情况下,进程内存开销非常小。
线程不会有内存开销问题,但除了启动时间之外,它还有另一个潜在的问题,你可能需要担心。。。GIL。如果你的线程在等待I/O时被阻塞,那么GIL可能不会成为问题,但如果你只是像在测试中一样运行一个循环,一次只能运行2个线程。。。。
换句话说;尽管你在测试中获得了相同的时间,但有很多事情是像这样简单的测试无法捕捉到的。
对于一个正在运行的程序来说,正确的答案可能不是担心启动时间,但它可能更依赖于
- 每个线程或进程将要做什么
- 它需要访问什么内存和状态,锁定会成为一个问题吗
- 在python中,GIL对工作负载会有问题吗(一次运行2个线程就足够了)
- 将进程足迹乘以进程数量是否为可接受的内存量
我遵循的一条基本经验法则是,如果线程/进程主要在I/O上被阻塞(等待网络流量或其他什么),请使用线程。如果您有更重的计算需求,并且内存不是一个问题,请使用进程。
该规则的一个例外是我希望如何处理进程或线程的内存和状态。当你开始谈论大量的线程和像这样的进程时,你可能会考虑内存访问/锁争用。。。
但现实地说,如果没有更多的数据,很难提出一个好的建议。并行编程是很多人都会做的事情之一,但很少有人真正理解(根据我的经验)。
一些需要研究的额外内容可能是重组流程,以减少线程数量。通常,在制作网络服务器和客户端时,我最终会使用线程,并有一个侦听器和发送器线程,它们要么阻塞队列,要么阻塞套接字等待处理。您可能希望减少只为队列提供消息的侦听器和发送器,从而限制开销。我认为Python3.5+中有一个新的异步库,它也可以简化你的生活。
我知道我并没有真正回答你的问题,但我希望我提供了一些东西来查找和查看。
希望能有所帮助!
要回答您的问题,我们需要了解python中线程和多处理的一些基础知识。事实证明,问题不在于启动开销,而在于每个系统如何在系统资源上分配运行负载。
首先,python中的线程与Linux中的线程不同。Linux为每个线程创建一个新的轻量级进程,这些进程可以在不同的CPU核心上运行,python脚本及其线程在任何给定时刻都在同一个CPU核心中运行。如果你想在python中实现真正的多处理,你必须使用多处理接口。
要演示以上内容,请运行Linux系统监视器,选择resources选项卡,然后在不同的终端窗口中,尝试运行我在下面插入的两个代码片段中的每一个。资源选项卡显示每个CPU核心上的负载。
第二个重要问题是,您希望同时处理数千个传入连接。为此,您可能需要多处理接口,但您可以容纳的进程和连接数量可能会受到限制,无论是在Linux中配置的,还是由于调度或资源(例如硬件)方面的瓶颈。
如果您选择一次不让大量进程处于活动状态,则处理此问题的一种方法是创建固定数量的进程,将它们存储在列表中,然后在它们进入时将传入连接传递给它们。当所有进程都很忙时,请等待。为此,您将需要至少一个计数信号量。
如果您想在连接进入时创建进程,您可以再次使用计数信号量来限制一次运行的进程数量。将计数信号量初始化为最大数量,为创建的每个进程递减,并在进程退出时递增。如上所述,当您达到允许的最大进程数时,您需要等待。
好的,下面是线程和多处理的代码示例。第一个启动5个线程。第二次启动5个进程。您可以通过一次编辑来更改这些,以达到100、1000等。每个循环中的整数处理循环可以让您在Linux系统监视器程序中看到负载。
#!/usr/bin/python
# Parallel code with shared variables, using threads
from threading import Lock, Thread
from time import sleep
# Variables to be shared across threads
counter = 0
run = True
lock = Lock()
# Function to be executed in parallel
def myfunc():
# Declare shared variables
global run
global counter
global lock
# Processing to be done until told to exit
while run:
n = 0
for i in range(10000):
n = n+i*i
print( n )
sleep( 1 )
# Increment the counter
lock.acquire()
counter = counter + 1
lock.release()
# Set the counter to show that we exited
lock.acquire()
counter = -1
lock.release()
print( 'thread exit' )
# ----------------------------
# Launch the parallel function in a set of threads
tlist = []
for n in range(5):
thread = Thread(target=myfunc)
thread.start()
tlist.append(thread)
# Read and print the counter
while counter < 5:
print( counter )
n = 0
for i in range(10000):
n = n+i*i
print( n )
#sleep( 1 )
# Change the counter
lock.acquire()
counter = 0
lock.release()
# Read and print the counter
while counter < 5:
print( counter )
n = 0
for i in range(10000):
n = n+i*i
print( n )
#sleep( 1 )
# Tell the thread to exit and wait for it to exit
run = False
for thread in tlist:
thread.join()
# Confirm that the thread set the counter on exit
print( counter )
这是多处理版本:
#!/usr/bin/python
from time import sleep
from multiprocessing import Process, Value, Lock
def myfunc(counter, lock, run):
while run.value:
sleep(1)
n=0
for i in range(10000):
n = n+i*i
print( n )
with lock:
counter.value += 1
print( "thread %d"%counter.value )
with lock:
counter.value = -1
print( "thread exit %d"%counter.value )
# -----------------------
counter = Value('i', 0)
run = Value('b', True)
lock = Lock()
plist = []
for n in range(5):
p = Process(target=myfunc, args=(counter, lock, run))
p.start()
plist.append(p)
while counter.value < 5:
print( "main %d"%counter.value )
n=0
for i in range(10000):
n = n+i*i
print( n )
sleep(1)
with lock:
counter.value = 0
while counter.value < 5:
print( "main %d"%counter.value )
sleep(1)
run.value = False
for p in plist:
p.join()
print( "main exit %d"%counter.value)