使用多处理的 Python



我正在尝试在python 3.6中使用多处理。我有一个for loop运行具有不同参数的method。目前,它一次运行一个,这需要相当多的时间,所以我正在尝试使用多处理。这是我所拥有的:

def test(self):
for key, value in dict.items():
pool = Pool(processes=(cpu_count() - 1))
pool.apply_async(self.thread_process, args=(key,value))
pool.close()
pool.join()

def thread_process(self, key, value):
# self.__init__()
print("For", key)

我认为我的代码使用 3 个进程来运行一个method但我想每个进程运行 1 个方法,但我不知道这是如何完成的。顺便说一句,我正在使用 4 个内核。

你在 for 循环的每次迭代中创建一个池。事先创建一个池,应用要在多处理中运行的进程,然后加入它们:

from multiprocessing import Pool, cpu_count
import time
def t():
# Make a dummy dictionary
d = {k: k**2 for k in range(10)}
pool = Pool(processes=(cpu_count() - 1))
for key, value in d.items():
pool.apply_async(thread_process, args=(key, value))
pool.close()
pool.join()

def thread_process(key, value):
time.sleep(0.1)  # Simulate a process taking some time to complete
print("For", key, value)
if __name__ == '__main__':
t()

您不会用数据填充multiprocessing.Pool- 而是在每个循环中重新初始化池。在您的情况下,您可以使用Pool.map()为您完成所有繁重的工作:

def thread_process(args):
print(args)
def test():
pool = Pool(processes=(cpu_count() - 1))
pool.map(thread_process, your_dict.items())
pool.close()
if __name__ == "__main__":  # important guard for cross-platform use
test()

此外,考虑到所有这些self论点,我认为您正在从类实例中抢走它,如果是这样 - 不要,除非您知道自己在做什么。由于 Python 中的多处理本质上是多处理(与多线程不同),因此您不能共享内存,这意味着您的数据在进程之间交换时会被腌制,这意味着任何无法被腌制的东西(如实例方法)都不会被调用。您可以在此答案中阅读有关该问题的更多信息。

我认为我的代码使用 3 个进程来运行一个方法,但我想每个进程运行 1 个方法,但我不知道这是如何完成的。顺便说一句,我正在使用 4 个内核。

不,您实际上在这里使用正确的语法来利用 3 个内核在每个内核上独立运行任意函数。你不能神奇地利用 3 个内核在一个任务上协同工作,而不明确地将其作为算法本身的一部分/编码,你自己经常使用线程(在 python 中的工作方式与它们在语言之外的工作方式不同)。

但是,您需要在每个循环中重新初始化池,您需要执行以下操作才能实际正确执行此操作:

cpus_to_run_on = cpu_count() - 1
pool = Pool(processes=(cpus_to_run_on)
# don't call a dictionary a dict, you will not be able to use dict() any 
# more after that point, that's like calling a variable len or abs, you 
# can't use those functions now
pool.map(your_function, your_function_args)
pool.close()

如果您想更好地了解其工作原理,请查看python multiprocessing文档以获取更具体的信息。 在python下,你不能利用线程来使用默认的CPython解释器进行多处理。 这是因为一种称为全局解释器锁的东西,它停止了来自 python 本身的并发资源访问。 GIL 不存在于该语言的其他实现中,也不是其他语言(如 C 和 C++)必须处理的东西(因此您实际上可以并行使用线程来协同完成任务,这与 CPython 不同)

Python 通过使用多处理模块时简单地创建多个解释器实例来解决这个问题,并且实例之间的任何消息传递都是通过在进程之间复制数据来完成的(即两个解释器实例通常不会触及相同的内存)。 然而,这在具有误导性名称的线程模块中不会发生,由于称为上下文切换的过程,该模块通常实际上会减慢进程速度。 今天的线程具有有限的用处,但提供了一种比异步 python 更容易绕过非 GIL 锁定进程(如套接字和文件读取/写入)的方法。

除此之外,您的多处理还有一个更大的问题。 写入标准输出。 你不会得到你想要的收益。 想想吧。 您的每个进程都"打印"数据,但其全部显示在一个终端/输出屏幕中。 因此,即使您的流程正在"打印",它们也不是真正独立地执行此操作,并且信息必须合并回文本界面所在的另一个进程(即您的控制台)。因此,这些进程将要发送到的任何内容写入某种缓冲区,然后必须将其复制到另一个进程(正如我们从多处理工作原理中学到的那样),然后该进程将获取缓冲数据并输出它。

通常,虚拟程序使用打印作为显示这些进程的执行之间如何没有顺序的手段,它们可以在不同的时间完成,它们并不意味着展示多核处理的性能优势。

本周我尝试了一些多处理。我发现在 python3 中进行多处理的最快方法是使用imap_unordered,至少在我的场景中是这样。下面是一个脚本,你可以尝试使用你的方案来确定最适合你的方案:

import multiprocessing
NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'imap_unordered'  # 'imap_unordered' or 'starmap' or 'apply_async'
def process_chunk(a_chunk):
print(f"processig mp chunk {a_chunk}")
return a_chunk

map_jobs = [1, 2, 3, 4]
result_sum = 0
if MP_FUNCTION == 'imap_unordered':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
for i in pool.imap_unordered(process_chunk, map_jobs):
result_sum += i
elif MP_FUNCTION == 'starmap':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
try:
map_jobs = [(i, ) for i in map_jobs]
result_sum = pool.starmap(process_chunk, map_jobs)
result_sum = sum(result_sum)
finally:
pool.close()
pool.join()
elif MP_FUNCTION == 'apply_async':
with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
result_sum = sum(result_sum)
print(f"result_sum is {result_sum}")

我发现starmap在性能上并没有落后太多,在我的场景中,它使用了更多的CPU,最终速度有点慢。希望这个样板有所帮助。

最小的工作示例:有序执行,尽快结果,最多n个线程

以下代码片段与以下边条件并行执行一些函数:

  • 最大线程数与 CPU 内核数一样
  • 作业可以按优先级执行
  • 结果一经公布即出

法典

import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy
N_THREADS = mp.cpu_count()
def process_single_task(task_name: str):
n_sec = random.randint(0, 4)
print(f"start {task_name=}, {n_sec=}")
time.sleep(n_sec)
print(f"end {task_name=}, {n_sec=}")
return task_name, n_sec
def fct_to_multiprocessing(
fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
if semaphore is not None:
semaphore.acquire()
results[job_id] = fct(**fct_kwargs)
if semaphore is not None:
semaphore.release()
def process_all_tasks(tasks: List[str]):
manager = mp.Manager()
results = manager.dict()  # <class 'multiprocessing.managers.DictProxy'>
sema = mp.Semaphore(N_THREADS)
jobs = {}
job_ids = list(range(len(tasks)))
for job_id in job_ids:
task = tasks[job_id]
jobs[job_id] = mp.Process(
target=fct_to_multiprocessing,
kwargs={
"fct": process_single_task, "fct_kwargs": {"task_name": task},
"job_id": job_id, "results": results, "semaphore": sema
}
)
jobs[job_id].start()
for job_id in job_ids:
job = jobs[job_id]
job.join()
result = results[job_id]
print(f"job {tasks[job_id]} returned {result=}")

if __name__ == "__main__":
tasks = list("abcdefghijklmnopqrstuvwxyz")
process_all_tasks(tasks)

输出

start task_name='a', n_sec=4
start task_name='c', n_sec=2
end task_name='c', n_sec=2
start task_name='b', n_sec=2
end task_name='a', n_sec=4
start task_name='d', n_sec=1
job a returned result=('a', 4)
end task_name='b', n_sec=2
start task_name='e', n_sec=0
end task_name='e', n_sec=0
job b returned result=('b', 2)
job c returned result=('c', 2)
start task_name='f', n_sec=0
end task_name='f', n_sec=0
start task_name='j', n_sec=2
end task_name='d', n_sec=1
start task_name='g', n_sec=1
job d returned result=('d', 1)
job e returned result=('e', 0)
job f returned result=('f', 0)
end task_name='g', n_sec=1
start task_name='i', n_sec=3
job g returned result=('g', 1)
end task_name='j', n_sec=2
start task_name='h', n_sec=1
end task_name='h', n_sec=1
start task_name='o', n_sec=4
job h returned result=('h', 1)
end task_name='i', n_sec=3
start task_name='n', n_sec=2
job i returned result=('i', 3)
job j returned result=('j', 2)
end task_name='n', n_sec=2
start task_name='k', n_sec=2
end task_name='o', n_sec=4
start task_name='r', n_sec=1
end task_name='r', n_sec=1
start task_name='m', n_sec=1
end task_name='k', n_sec=2
start task_name='l', n_sec=4
job k returned result=('k', 2)
end task_name='m', n_sec=1
start task_name='s', n_sec=3
end task_name='s', n_sec=3
start task_name='p', n_sec=3
end task_name='l', n_sec=4
start task_name='q', n_sec=0
end task_name='q', n_sec=0
start task_name='t', n_sec=0
end task_name='t', n_sec=0
job l returned result=('l', 4)
job m returned result=('m', 1)
job n returned result=('n', 2)
job o returned result=('o', 4)
start task_name='u', n_sec=4
end task_name='p', n_sec=3
start task_name='v', n_sec=0
end task_name='v', n_sec=0
start task_name='x', n_sec=4
job p returned result=('p', 3)
job q returned result=('q', 0)
job r returned result=('r', 1)
job s returned result=('s', 3)
job t returned result=('t', 0)
end task_name='u', n_sec=4
start task_name='y', n_sec=4
job u returned result=('u', 4)
job v returned result=('v', 0)
end task_name='x', n_sec=4
start task_name='z', n_sec=0
end task_name='z', n_sec=0
start task_name='w', n_sec=1
end task_name='w', n_sec=1
job w returned result=('w', 1)
job x returned result=('x', 4)
end task_name='y', n_sec=4
job y returned result=('y', 4)
job z returned result=('z', 0)

** Process exited - Return Code: 0 **
Press Enter to exit terminal

免责声明:因此time.sleep(n_sec)代表一些计算繁重的函数。如果它实际上只是等待,asyncio通常是一个更好的选择(即使增加这里的线程数量也应该可以完成这项工作)。

多个处理的示例。希望对您有所帮助:

from multiprocessing import Process  
def fun_square(x):
x_square = x**2
print('x_square: ', x_square) 
def x_pow_y(x,y):
x_pow_y = x**y
print('x_pow_y: ', x_pow_y)
def fun_qube(z):
z_qube = z*z*z
print('z_qube: ', z_qube)

def normal_fun():
print("Normal fun is working at same time...")

p1 = Process(target = fun_square, args=(5,)).start() #args=(x,)
p2 = Process(target = x_pow_y, args=(2,4,)).start() #args=(x,y,)
p3 = Process(target = fun_qube(5)).start() #fun_qube(z)
p4 = Process(target = normal_fun).start()

相关内容

  • 没有找到相关文章

最新更新