Python在Windows上高效地进行多处理



假设我们将一个数字拆分为不同的域:例如:100拆分为:[0,25][25,50][50,75][75,100]。然后,我们将这4个列表中的每一个发送到4个单独过程中的一个进行计算,然后将其重新组合为数字100的单个拆分单元。我们连续多次迭代,需要过程"作为一个单元,将1000个数字拆分为类似于[0,25][25,50][50,75][75,100]的单独域。如果我们必须关闭流程,使它们作为一个单独的组单元来处理答案,就会出现效率问题。由于windows在运行进程方面与Unix相比是垃圾,因此我们被迫使用";"产卵";方法而不是fork。spawn方法在生成进程方面很慢,所以我想为什么不保持进程"打开并向它们传递数据,而不需要为并行进程的每个迭代组打开和关闭它们"。下面的示例代码将执行此操作。它将保持进程"作为类消费者"的打开状态,该类消费者将不断使用run()(在while循环中)请求具有.get()Joinable队列的下一个任务:

import multiprocessing

class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill shutdown of .get() loop with break
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return

class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
for i in range(self.b):
if self.a % i == 0:
return 0
return 1

if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count() 

# Make a list of Consumer object process' ready to be opened.
consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
for _ in range(num_jobs):  # -so the above loop enqueues all jobs without- 
result = results.get() # -waiting for the previous .put() to .get() first.

# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.

这段代码只是一个例子。在该代码的其他变体中:当实现tasks.put()和results.get()的更多迭代时,需要一种方法使排队的Task(对象)在完全计算出答案并自行返回之前通过外部调用返回。如果你已经从单个分裂数字组的另一个过程中得到了答案,这将释放资源。Task(对象)需要存在__call__描述符,才能作为调用tasks.put(Task(i,100))的函数工作。在过去的两周里,我一直在努力寻找一种有效的方法。我需要采取完全不同的方法吗?不要误解我的困境,我使用的代码是有效的,但没有我在微软Windslows上想要的那么高效。如有任何帮助,我们将不胜感激。

Task(对象)是否与将其排入队列的Consumer()进程存在于同一进程中?如果是这样的话,我难道不能告诉Class Consumer()Run()的所有进程在不关闭while循环(使用毒丸)的情况下停止当前正在运行的Task(对象),这样他们就可以立即接受另一个Task(),而无需关闭并重新打开进程吗?当你打开和关闭数千个过程进行迭代计算时,它确实会累积并浪费时间。我尝试过使用事件()管理器()其他队列()。似乎没有一种有效的方法可以从外部干预Task(对象),将return立即发送给其父级Consumer(),这样,如果其他Consumers()中的一个返回的答案使其他Consumer)任务的计算变得不相关,它就不会一直浪费资源进行计算,因为它们都是作为一个单独的数字的统一计算进行工作的,这些数字被划分为多个组。

您所做的是实现自己的多处理池,但为什么?您是否不知道concurrent.futures.ProcessPoolExecutormultiprocessing.pool.Pool类的存在,后者实际上更适合您的特定问题?

这两个类都实现了多处理池和各种方法,用于向池提交任务并从这些任务中获取结果。但是,由于在您的特定情况下,您提交的任务正试图解决相同的问题,而您只对第一个可用的结果感兴趣,并且一旦您完成了这项工作,您需要能够终止任何剩余的正在运行的任务。只有multiprocessing.pool.Pool允许您这样做。

以下代码使用方法Pool.apply_async提交任务。此函数不阻塞,而是返回一个具有阻塞get方法的AsyncResult实例,您可以调用该方法从提交的任务中获取结果。但是,由于通常情况下您可能会提交许多任务,我们不知道在这些实例中的哪一个上调用get。因此,解决方案是使用apply_asynccallback参数来指定一个函数,该函数将在任务可用时使用任务的返回值异步调用。然后问题就变成了把这个结果传达回来。有两种方式:

方法1:通过全局变量

from multiprocessing import Pool
import time

def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(answer):
global solution
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
solution = answer
pool.terminate() # kill all tasks

if __name__ == '__main__':
t = time.time()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for all tasks to terminate:
pool.close()
pool.join()
print(solution)
print('Total elapsed time:', time.time() - t)

打印:

9
Total elapsed time: 1.1378364562988281

方法2:通过队列

from multiprocessing import Pool
from queue import Queue
import time

def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(solution):
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
q.put_nowait(solution)

if __name__ == '__main__':
t = time.time()
q = Queue()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for first returned result from callback:
solution = q.get()
print(solution)
pool.terminate() # kill all tasks in the pool
print('Total elapsed time:', time.time() - t)

打印:

9
Total elapsed time: 1.1355643272399902

更新

即使在Windows下,与任务完成所需的时间相比,创建和重新创建池的时间也可能相对微不足道,尤其是对于稍后的迭代,即n的较大值。如果您正在调用同一个辅助函数,那么第三种方法是使用池方法imap_unordered。我还包括一些代码来衡量我的桌面启动新池实例的开销:

from multiprocessing import Pool
import time

def worker(x):
time.sleep(x) # emulate working on the problem
return 9 # the solution

if __name__ == '__main__':
# POOLSIZE = multiprocessing.cpu_count()
POOLSIZE = 8 # on my desktop
# how long does it take to start a pool of size 8?
t1 = time.time()
for i in range(16):
pool = Pool(POOLSIZE)
pool.terminate()
t2 = time.time()
print('Average pool creation time: ', (t2 - t1) / 16)
# POOLSIZE number of calls:
arguments = [7, 6, 1, 3, 4, 2, 9, 6]
pool = Pool(POOLSIZE)
t1 = time.time()
results = pool.imap_unordered(worker, arguments)
it = iter(results)
first_solution = next(it)
t2 = time.time()
pool.terminate()
print('Total elapsed time:', t2 - t1)
print(first_solution)

打印:

Average pool creation time:  0.053139880299568176
Total elapsed time: 1.169790506362915
9

更新2

这是一个难题:你有多个过程在处理一块拼图。例如,一旦一个进程发现一个数字可以被通过范围内的一个数字整除,那么在其他进程中测试不同范围内的数字就没有意义了。你可以做三件事中的一件。在开始下一次迭代之前,您可以什么都不做,让流程完成。但这会推迟下一次迭代。我已经建议您终止进程,这样可以释放处理器。但这需要你创建新的流程,而你发现这些流程并不令人满意。

我只能想到另一种可能性,我在下面用你的多处理方法介绍它。名为stop的多处理共享内存变量被初始化,每个进程作为全局变量,并在每次迭代前设置为0。当一个任务被设置为返回值0,并且在其他进程中运行的其他任务没有意义时,它会将stop的值设置为1。这意味着任务必须定期检查stop的值,并在其设置为1时返回这当然会给处理增加额外的周期在下面的演示中,我实际上有100个任务排队等待8个处理器。但是最后92个任务将立即发现stop已经设置,并且应该在第一次迭代时返回。

顺便说一句:原始代码使用multiprocessing.JoinableQueue实例而不是multiprocessing.Queue对任务进行排队,当消息从队列中取出时,会在该实例上调用task_done。然而,在这个队列上从未调用过join(它会告诉你所有消息何时都被删除),从而破坏了创建这样一个队列的全部目的。事实上,不需要JoinableQueue,因为主进程已经提交了num_jobs作业,并且在结果队列上期望num_jobs消息,并且可以循环并从结果队列中提取期望数量的结果。我用一个简单的Queue代替了JoinableQueue,保留了原始代码,但注释掉了。此外,Consumer进程可以被创建为守护进程(具有参数daemon=True),然后当所有非守护进程,即主进程终止时,它们将自动终止,从而避免了使用特殊的";"毒丸";None任务消息。我做了这个更改,再次保留了原始代码,但为了进行比较,我将其注释掉了。

import multiprocessing

class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, stop):
# make ourself a daemon process:
multiprocessing.Process.__init__(self, daemon=True)
self.task_queue = task_queue
self.result_queue = result_queue
self.stop = stop
def run(self):
global stop
stop = self.stop
while True:
next_task = self.task_queue.get()
"""
if next_task is None:
# Poison pill shutdown of .get() loop with break
#self.task_queue.task_done()
break
"""
answer = next_task()
#self.task_queue.task_done()
self.result_queue.put(answer)
# return

class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
global stop
# start the range from 1 to avoid dividing by 0:
for i in range(1, self.b):
# how frequently should this check be made?
if stop.value == 1:
return 0
if self.a % i == 0:
stop.value = 1
return 0
return 1

if __name__ == '__main__':
# Establish communication queues
#tasks = multiprocessing.JoinableQueue()
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count()
# Make a list of Consumer object process' ready to be opened.
stop = multiprocessing.Value('i', 0)
consumers = [ Consumer(tasks, results, stop) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
# many more jobs than processes, but they will stop immediately once they check the value of stop.value:
num_jobs = 100
stop.value = 0 # make sure it is 0 before an iteration
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
results = [results.get() for _ in range(num_jobs)]
print(results)
print(0 in results)
"""
# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
"""

打印:

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
True

我终于找到了解决方案!

import multiprocessing

class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, state):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.state = state

def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
# answer = next_task() is where the Task object is being called.
# Python runs on a line per line basis so it stops here until assigned.
# Put if-else on the same line so it quits calling Task if state.is.set()
answer = next_task() if self.state.is_set() is False else 0
self.task_queue.task_done()
self.result_queue.put(answer)
return

class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
for i in range(self.b):
if self.a % i == 0:
return 0
return 1

def initialize(n_list, tasks, results, states):
sum_list = []
for i in range(cpu_cnt):
tasks.put(Task(n_list[i], number))
for _ in range(cpu_cnt):
sum_list.append(int(results.get()))
if 0 in sum_list:
states.set()
if 0 in sum_list:
states.clear()
return None
else:
states.clear()
return number

if __name__ == '__main__':
states = multiprocessing.Event() # ADD THIS BOOLEAN FLAG EVENT!
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_cnt = multiprocessing.cpu_count() 
# Add states.Event() to Consumer argument list:
consumers = [ Consumer(tasks, results, states) for i in range(cpu_cnt) ]
for w in consumers:
w.start()
n_list = [x for x in range(1000)]
iter_list = []
for _ in range(1000):
iter_list.append(initialize(n_list, tasks, results, states)

for _ in range(num_jobs):
result = results.get()

for i in range(num_consumers):
tasks.put(None) 

如果打开的Consumer对象在同一行上用If-else语句将答案分配给next_task()函数调用,那么它将在状态出现时退出。Event()标志被设置,因为它被锁定到该行;回答";由排队的Task对象分配。这是一个很好的变通方法!它使Task对象在Consumer的while循环中可中断,该Consumer通过变量"运行Task对象;回答";分配在两周多的时间里,我找到了一个加快速度的解决方案!我在一个工作版本的代码上测试了它,它更快!使用这样的方法,可以无限期地打开多个进程,并通过Consumer对象可连接队列循环传递许多不同的Task对象,从而以惊人的速度并行处理大量数据!它几乎像这样的代码使所有核心的功能都像一个"核心";"超级核心";其中所有进程对每个核心保持开放,并为任何所需的迭代i/o流协同工作!

以下是我的一个python多处理程序在8个超线程内核上实现此方法的示例输出:

Enter prime number FUNCTION:n+n-1
Enter the number for 'n' START:1
Enter the number of ITERATIONS:100000
Progress: ########## 100%
Primes:
ƒ(2) = 3
ƒ(3) = 5
ƒ(4) = 7
ƒ(6) = 11
ƒ(7) = 13
ƒ(9) = 17
etc etc...
ƒ(99966) = 199931
ƒ(99967) = 199933
ƒ(99981) = 199961
ƒ(99984) = 199967
ƒ(100000) = 199999
Primes found: 17983
Prime at end of list has 6 digits.
Overall process took 1 minute and 2.5 seconds.

所有17983个素数在约1分钟的内从1到200000(除了#2)全模量

在3990x128线程的AMD Threadipper上,大约需要8秒。

以下是8个超线程内核的另一个输出:

Enter prime number FUNCTION:((n*2)*(n**2)**2)+1
Enter the number for 'n' START:1
Enter the number of ITERATIONS:1000
Progress: ########## 100%
Primes:
ƒ(1) = 3
ƒ(3) = 487
ƒ(8) = 65537

etc... etc...

ƒ(800) = 655360000000001
ƒ(839) = 831457011176399
ƒ(840) = 836423884800001
ƒ(858) = 929964638281537
ƒ(861) = 946336852720603
ƒ(884) = 1079670712526849
ƒ(891) = 1123100229130903
ƒ(921) = 1325342566697203
ƒ(953) = 1572151878119987
ƒ(959) = 1622269605897599
ƒ(983) = 1835682572370287
Primes found: 76
Prime at end of list has 16 digits.
Overall process took 1 minute and 10.6 seconds.

最新更新