我正在尝试通过子进程并行运行生成器进程。但是当我尝试这样做时,我看到带有生成器的函数是由父进程处理的!!
from multiprocessing import Process
import os import time
class p(Process):
def __init__(self):
Process.__init__(self)
def run(self):
print('PID:', os.getpid())
def genfunc(self):
time.sleep(1)
yield os.getpid()
p1 = p()
p2 = p()
p1.start()
p2.start()
print('Iterators:')
print('Ran by:',next(p1.genfunc()))
print('Ran by:',next(p2.genfunc()))
输出:
PID: 20383
PID: 20384
Iterators:
Ran by:20382
Ran by:20382
我的目标是通过子进程运行生成器函数,从而向父进程生成结果。
在 run() 调用中使用 yield stmt 没有成功,我尝试了上述方法。
有人可以帮助我实现我的目标吗?
在两个进程之间实现通信的一种方法是使用Queue
实例。在下面的示例中,我没有创建两个单独的进程,而是选择创建一个包含两个进程的进程池:
from multiprocessing import Pool, Manager
import os
def p(q):
pid = os.getpid()
q.put(pid)
for i in range(5):
q.put(i)
q.put(None) # signify "end of file"
def main():
manager = Manager()
q1 = manager.Queue()
q2 = manager.Queue()
with Pool(2) as pool: # create a pool of 2 processes
pool.apply_async(p, args=(q1,))
pool.apply_async(p, args=(q2,))
q1_eof = False
q2_eof = False
while not q1_eof or not q2_eof:
if not q1_eof:
obj = q1.get() # blocking get
if obj is None:
q1_eof = True
else:
print(obj)
if not q2_eof:
obj = q2.get() # blocking get
if obj is None:
q2_eof = True
else:
print(obj)
if __name__ == '__main__':
main()
指纹:
5588
24104
0
0
1
1
2
2
3
3
4
4
使用显式Process
实例而不是创建池的代码如下(我不倾向于对Process
类进行子类化,因为这需要更多的编码:)
from multiprocessing import Process, Queue
import os
def p(q):
pid = os.getpid()
q.put(pid)
for i in range(5):
q.put(i)
q.put(None) # signify "end of file"
def main():
q1 = Queue()
q2 = Queue()
p1 = Process(target=p, args=(q1,))
p1.start()
p2 = Process(target=p, args=(q2,))
p2.start()
q1_eof = False
q2_eof = False
while not q1_eof or not q2_eof:
if not q1_eof:
obj = q1.get() # blocking get
if obj is None:
q1_eof = True
else:
print(obj)
if not q2_eof:
obj = q2.get() # blocking get
if obj is None:
q2_eof = True
else:
print(obj)
p1.join()
p2.join()
if __name__ == '__main__':
main()
重要说明
两个编码示例(一个使用进程池,另一个不使用)使用两种不同类型的队列实例。
请参阅 Python 多处理。Queue vs multiprocessing.manager()。队列()
在所有情况下,您始终可以使用multiprocessing.manager().Queue()
(我通常这样做),但可能会损失一些效率。
我希望谈到使用生成器进行处理,您真正希望完成接下来的事情:
- 主进程通过一些生成器懒惰地生成一些任务,任务由一些数据(
arg
)表示。 - 这些任务可能由生成器非常缓慢地生成,例如通过从互联网获取数据块,因此应该在准备好处理后立即逐个处理。
- 主进程将这些任务发送到多个子进程进行处理。
- 儿童的处理也可能需要缓慢而随机的时间。
- 儿童应报告一些结果(成功处理的结果数据或在失败时编码错误)。
- 主进程也应该懒惰地收集所有结果,即一旦准备好报告它们。
- 主进程内的结果可以按照与生成的顺序严格相同的顺序收集(严格顺序
True
),或者在处理后立即以任意顺序收集(严格顺序False
),第二个变体可能要快得多。 - 为了提高效率,应使用所有 CPU 内核,每个内核一个进程。
出于所有这些目的,我创建了可用于特定问题的示例模板代码:
在线试用!
def ProcessTask(arg):
import time, os
print('Started task', arg[0], arg[1], 'by', os.getpid())
time.sleep(arg[1])
print('Finished task', arg[0], arg[1], 'by', os.getpid())
return (arg[0], arg[1] * 2)
def Main():
import multiprocessing as mp
def GenTasks(n):
import random, os, time
for i in range(n):
t = round(random.random() * 2., 3)
print('Created task', i, t, 'by', os.getpid())
yield (i, t)
time.sleep(random.random())
num_tasks = 4
for strict_order in [True, False]:
print('nIs strict order', strict_order)
with mp.Pool() as pool:
for res in (pool.imap_unordered, pool.imap)[strict_order](
ProcessTask, GenTasks(num_tasks)
):
print('Result from task', res)
if __name__ == '__main__':
Main()
输出:
Is strict order True
Created task 0 0.394 by 10536
Created task 1 0.357 by 10536
Started task 0 0.394 by 8740
Started task 1 0.357 by 5344
Finished task 1 0.357 by 5344
Finished task 0 0.394 by 8740
Result from task (0, 0.788)
Result from task (1, 0.714)
Created task 2 0.208 by 10536
Started task 2 0.208 by 5344
Finished task 2 0.208 by 5344
Result from task (2, 0.416)
Created task 3 0.937 by 10536
Started task 3 0.937 by 8740
Finished task 3 0.937 by 8740
Result from task (3, 1.874)
Is strict order False
Created task 0 1.078 by 10536
Started task 0 1.078 by 7256
Created task 1 0.029 by 10536
Started task 1 0.029 by 5440
Finished task 1 0.029 by 5440
Result from task (1, 0.058)
Finished task 0 1.078 by 7256
Result from task (0, 2.156)
Created task 2 1.742 by 10536
Started task 2 1.742 by 5440
Created task 3 0.158 by 10536
Started task 3 0.158 by 7256
Finished task 3 0.158 by 7256
Result from task (3, 0.316)
Finished task 2 1.742 by 5440
Result from task (2, 3.484)
PS:
- 在前面的代码中,当使用
multiprocessing
时,通常主进程和子进程都使用相同的单个模块脚本,主进程和子进程都从执行整个脚本开始。if __name__ == '__main__':
块仅由主进程运行,模块的其余代码由主进程和子进程执行。 - 好的做法是将 main 执行所需的所有内容放入一个函数(在我的情况下
Main()
),将子级执行到另一个函数(在我的情况下ProcessTask()
),将其他一些函数和变量放入由 main 和子共享和运行的全局范围内(我在代码中没有任何共享)。 - 处理函数(在我的代码中
ProcessTask()
)应该在模块的全局范围内。 - 有关
multiprocessing
的其他文档可在此处获得。