我应该使用多处理、多线程还是其他方法?



我是Python的初学者,我目前正在从事一个项目,由于缺乏更好的术语,我已将其拆分为多个代码"模块"。我的代码有效地连接到 API,接收数据,对该数据执行计算,然后根据计算值返回与 API 的通信。

以下是我将代码拆分为的"模块":

API
  1. 身份验证器 --> 是一个循环,用于检查当前 API 访问代码是否已过期。如果不是到期时间,则不会发生任何反应;如果是过期时间,程序将提交新的访问代码请求,并将新的访问代码存储在计算机的内存中

  2. 数据流 --> 是一个开放的 websockets 连接,它从 API 接收连续的数据流

  3. 数据处理器 ->从数据流中获取数据并计算处理后的值并将其存储在内存中。

  4. 执行器 -->如果使用数据处理器中派生的值满足条件,请使用身份验证器模块生成的 API 密钥并向服务器提交请求以执行某些操作。

我实际上对一次运行所有这些模块很感兴趣。麻烦的是,它们是连续的操作,需要永远一遍又一遍地循环,同时在它们之间传递变量。有没有一种有效的方法让我的所有"模块"同时运行并在彼此之间传递数据?-->我正在研究多处理和多线程,但我有兴趣看看是否有最好的选择。

非常感谢。

如果同时存在网络延迟和大量 CPU 密集型计算,则应考虑同时创建线程池和处理池。处理池将传递给工作线程函数,该函数使用线程池调用,并用于执行 CPU 密集型工作。当然,这比仅使用多处理要复杂一些,但它允许您更便宜地重叠网络检索。这个想法是,例如,如果您有 200 个要检索的 URL 和计算机上的 8 个处理器,您可以创建一个大小为 200 的进程池,以尽可能高效地重叠 200 个 URL 的检索,但当然,您最多只能并行处理 8 个提交的 CPU 密集型部分。但是,创建 200 个进程是一项相对昂贵的操作,尤其是在使用spawn创建新进程(如 Windows)的平台上(您忽略了在您的平台上标记您的问题)。因此,创建大小为 200 的线程池会更优化(即更便宜)。

下面是仅使用多处理池与线程池和多处理池模拟上述情况的基准测试。遗憾的是,在我的平台 Windows 上,您无法创建大小大于 60 的多处理池。这意味着,如果您在 Windows 下运行并且要检索的 URL 超过 60 个,则实际上别无选择,只能使用两种类型的池:

from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time
def cpu_intensive_calculation(data):
""" takes approximately .25 seconds on my desktop """
QUARTER_SECOND_ITERATIONS = 5_000_000
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return sum
def fetch_data(url):
""" approximately .3 seconds of "I/O" """
time.sleep(.3)
return ""
def retrieve_url1(url):
retrieved_data = fetch_data(url)
result = cpu_intensive_calculation(retrieved_data)
return result
def benchmark1():
urls = ['x'] * 60
t1 = time.time()
pool = Pool(60)
result = pool.map(retrieve_url1, urls)
t2 = time.time()
print('Multiprocessing only:', t2 - t1)
def retrieve_url2(process_pool, url):
retrieved_data = fetch_data(url)
result = process_pool.apply(cpu_intensive_calculation, args=(retrieved_data,))
return result
def benchmark2():
urls = ['x'] * 60 # 60
t1 = time.time()
thread_pool = ThreadPool(60)
process_pool = Pool()
result = thread_pool.map(partial(retrieve_url2, process_pool), urls)
t2 = time.time()
print('Thread pool and multiprocessing pool:', t2 - t1)
if __name__ == '__main__':
benchmark1()
benchmark2()

指纹:

Multiprocessing only: 5.233615875244141
Thread pool and multiprocessing pool: 4.272637844085693

或者使用协程(PyPI存储库中的包aiohttp)进行网络检索,loop.run_in_executor执行 CPU 密集型工作。

但是,如果您执行的计算相对琐碎,或者在 Python 模块(例如释放全局解释器锁的numpy)中完成,则只需对所有内容使用多线程即可。

更新

以下包括一个使用asyncio协程而不是multithreading的基准测试,它可以处理数百甚至数千个并发Web检索。它使用concurrent.futures模块中的ProcessPoolExecutor类来运行"阻塞"操作,因此我已切换到该类进行多处理:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
import asyncio
import time
def cpu_intensive_calculation(data):
""" takes approximately .25 seconds on my desktop """
QUARTER_SECOND_ITERATIONS = 5_000_000
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return sum
def fetch_data(url):
""" approximately .3 seconds of "I/O" """
time.sleep(.3)
return ""
def retrieve_url1(url):
retrieved_data = fetch_data(url)
result = cpu_intensive_calculation(retrieved_data)
return result
def benchmark1():
urls = ['x'] * 60
t1 = time.time()
with ProcessPoolExecutor(max_workers=60) as pool:
result = list(pool.map(retrieve_url1, urls))
t2 = time.time()
print('Multiprocessing only:', t2 - t1)
def retrieve_url2(process_pool, url):
retrieved_data = fetch_data(url)
future = process_pool.submit(cpu_intensive_calculation, retrieved_data)
return future.result()
def benchmark2():
urls = ['x'] * 60 # 60
t1 = time.time()
with ThreadPoolExecutor(max_workers=60) as thread_pool:
with ProcessPoolExecutor() as process_pool:
result = list(thread_pool.map(partial(retrieve_url2, process_pool), urls))
t2 = time.time()
print('Thread pool and multiprocessing pool:', t2 - t1)
return result
async def retrieve_url3(url):
await asyncio.sleep(.3) # the I/O portion
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, cpu_intensive_calculation, '') # cpu portion
return result

async def benchmark3():
global executor
urls = ['x'] * 60 # 60
t1 = time.time()
with ProcessPoolExecutor() as executor:
result = await asyncio.gather(*(retrieve_url3(url) for url in urls))
t2 = time.time()
print('asyncio:', t2 - t1)
if __name__ == '__main__':
benchmark1()
benchmark2()
asyncio.run(benchmark3())

指纹:

Multiprocessing only: 5.816989183425903
Thread pool and multiprocessing pool: 4.408833742141724
asyncio: 4.401129961013794

相关内容

最新更新