使用 Python 对本地端点进行并行 API 调用,从而最大限度地发挥 CPU



Python 3.10

我的任务是证明我可以通过调用在Java VM中运行的本地API端点以及"测量和记录吞吐量"来最大化Mac笔记本电脑(10核)的CPU,所有这些都使用Python。 对于并行化,我已经研究并决定根据以下答案使用asyncio:https://stackoverflow.com/a/59385935/7191927

我计划使用htop来显示所有内核已最大化,以便我认为我已经涵盖了该部分。我被绊倒的地方是我实际上需要在代码中做的事情,以最大化 CPU。

这就是我目前所拥有的。 此代码用于调用两个本地 API 终结点(每个终结点仅处理文本块并提取相关术语):

import asyncio
from api import API, DocumentParameters, EndException

def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def get_word_results(data):
api = API(url='http://localhost:8181/rest/words')     
words_data = data        
params = DocumentParameters()
params["content"] = words_data 
try:
result = api.words(params)
except EndException as exception:
print(exception)
return result
@background
def get_language_results(data):
api = API(url='http://localhost:8181/rest/languages')     
language_data = data        
params = DocumentParameters()
params["content"] = language_text_data 
try:
result = api.language(params)
except EndException as exception:
print(exception)
return result
if __name__ == '__main__':
filepath = "/Users/me/stuff.txt"
with open(filepath, 'r') as file: 
data = file.read()
get_word_results(data)
get_language_results(data)
print('Done.')

这就是我的Python知识/经验开始减弱的地方。

那么最有效的方法是什么:

  1. 连续运行此代码,并不断增加线程计数,以尝试最大化 CPU。
  2. 根据要求测量和记录吞吐量。

编辑 1 - 赏金开始。为此,我需要一个可靠的解决方案 - 它最大化 CPU 并提供某种输出来显示这一点,以及正在进行多少次调用并导致最大值。 根据宫城先生在评论中所说的话,听起来multiprocessing是我想要的,要么代替,要么与asyncio获胜者将以最少的代码行数实现。

编辑 2 - 必须使用一个脚本/程序完成,运行一次,而不是多次运行相同的脚本。

使用asyncio是一个不错的选择。我会将其与aiohttp配对以调用 java 服务。下面是一个具有可等待test()方法的框架脚本,该方法允许您运行任意数量的调用,并根据需要并行运行任意数量的调用:

import asyncio
import aiohttp
async def test():
# examples: will run a total of 'task_count' calls limited to 'concurrency' running in parallel 
task_count = 1000
concurrency = 10
async with aiohttp.ClientSession('http://localhost:8181') as session:
semaphore = asyncio.Semaphore(concurrency)
tasks = [asyncio.ensure_future(call_service(session, semaphore, task_id)) for task_id in range(0, task_count)]
await asyncio.gather(*tasks)
async def call_service(session, semaphore, task_id):
async with semaphore:
# start your timer here
async with session.get('/rest/words') as resp:
await resp.text()
# stop your timer here and log/accumulate stats

这里的原则是创建一个异步任务列表,这些任务可能同时执行。但是,您想知道最大化 CPU 需要多少次,因此提供了一个concurrency参数,允许您使用信号量限制同时发生的调用数。

我已经评论了您可以在哪里添加语句以开始测量时间。

为了快速运行相同的python文件多次。最多只需一次运行 20 个 python 脚本或其他东西。

相关内容

最新更新