多处理生成过程而不使用池



我正在尝试使用多处理库来生成新进程,而不使用池,也不创建僵尸。

在Unix上,当进程完成但尚未加入时,它将成为僵尸。永远不应该有很多,因为每次进程启动(或调用active_children()(全部完成将加入尚未加入的进程。同时呼叫完成的进程的CCD_ 2将加入该进程。即便如此显式连接所有你开始。

这个实现是一个更大脚本的短版本,它在几个小时后创建僵尸:

from multiprocessing import Process
import time
def target(task):
print(f"Working for {task*2} seconds ...")
time.sleep(task*2)
if __name__ == '__main__':
processes = 4
list_process = [None] * processes
targets = [[2] for i in range(10)]
list_process = [None] * processes
while targets:
for i in range(processes):
p = list_process[i]
if not (p and p.is_alive()):
list_process[i] = Process(target=target, args=(targets.pop(0)))
list_process[i].start()
if p:
p.join()
for process in list_process:
if process:
process.join()

在更大的版本中,list_process只有僵尸,不能处理更多的任务。

更新1

多亏了Booboo,我才能更好地了解正在发生的事情:

from multiprocessing import Process
import time
def target(task):
print(f"Working for {task*2} seconds ...")
time.sleep(task*2)
if __name__ == '__main__':
started_count = 0
joined_count = 0
joined_list = []
processes = 4
list_process = [None] * processes
targets = [[2] for i in range(10)]
list_process = [None] * processes
while targets:
for i in range(processes):
p = list_process[i]
if not (p and p.is_alive()):
list_process[i] = Process(target=target, args=(targets.pop(0)))
list_process[i].start()
print(list_process[i].pid)
started_count += 1
if p:
assert(not p.is_alive())
p.join()
joined_list.append(list_process[i].pid)
joined_count += 1
for process in list_process:
if process:
process.join()
joined_list.append(list_process[i].pid)
joined_count += 1
print(f'Final started count: {started_count}, final joined count: {joined_count}')
print(joined_list)

输出:

20604
24108
1272
23616
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
18492
17348
19992
6216
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
18744
26240
Working for 4 seconds ...
Working for 4 seconds ...
Final started count: 10, final joined count: 10
[18492, 17348, 19992, 6216, 18744, 26240, 6216, 6216, 6216, 6216]

我有10个进程加入了,但有些进程不是好的(没有为任务调用pid6216,第一个进程没有加入(,导致进程没有加入,为什么?

我以前看过这段代码,就而言,它似乎是正确的。我对它进行了修改,以跟踪进程启动和连接的次数,并添加了一个断言;健全性检查":

from multiprocessing import Process
import time
def target(task):
print(f"Working for {task*2} seconds ...")
time.sleep(task*2)
if __name__ == '__main__':
started_count = 0
joined_count = 0
processes = 4
list_process = [None] * processes
targets = [[2] for i in range(10)]
list_process = [None] * processes
while targets:
for i in range(processes):
p = list_process[i]
if not (p and p.is_alive()):
list_process[i] = Process(target=target, args=(targets.pop(0)))
list_process[i].start()
started_count += 1
print('started count:', started_count)
if p:
assert(not p.is_alive())
p.join()
joined_count += 1
print('joined count:', joined_count)
for process in list_process:
if process:
process.join()
joined_count += 1
print('joined count:', joined_count)
print(f'Final started count: {started_count}, final joined count: {joined_count}')

打印:

started count: 1
started count: 2
started count: 3
started count: 4
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
started count: 5
joined count: 1
started count: 6
joined count: 2
started count: 7
joined count: 3
started count: 8
joined count: 4
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
started count: 9
joined count: 5
started count: 10
joined count: 6
joined count: 7
Working for 4 seconds ...
Working for 4 seconds ...
joined count: 8
joined count: 9
joined count: 10
Final started count: 10, final joined count: 10

你的程序中是否有其他你没有发布的内容导致了问题?

实施流程池

如果我可以提出一个建议:您实现流程池的方法相当低效。如果你有100个任务要提交,那么你就创建了100个流程。这是而不是处理池的定义。的确,您控制了并行度,但未能重用进程,这是池的核心理想。下面演示如何创建一个由4个进程组成的池,这些进程可以根据需要执行任意多的任务。当所有任务都完成后,您只需要加入这4个流程。这可能会大大有助于解决你的僵尸问题:

from multiprocessing import Process, Queue
import time
def target(queue):
while True:
task = queue.get()
if task is None: # "end of file" indicator
break
print(f"Working for {task*2} seconds ...")
time.sleep(task*2)
if __name__ == '__main__':
N_PROCESSES = 4
processes = []
queue = Queue()
for _ in range(N_PROCESSES):
processes.append(Process(target=target, args=(queue,)))
for process in processes:
process.start()
# Write tasks to the job queue:
for _ in range(10):
queue.put(2)
# And write an "end of file" indicator for each process in the pool:
for _ in range(N_PROCESSES):
queue.put(None)
# Wait for processes to complete:
for process in processes:
process.join()

打印:

Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ..

请注意,您还可以向每个进程传递用于输出结果的第二个队列。只需确保在加入进程之前将此队列的结果get即可。

相关内容

  • 没有找到相关文章

最新更新