这是Python.org中的确切代码。如果您注释掉time.sleep()
,它将崩溃并进行长异常回溯我想知道为什么
而且,我确实理解为什么Python.org将其包含在他们的示例代码中。但是人为地创造了";工作时间";通过time.sleep()
删除时不应破坏代码。在我看来,time.sleep()
提供了某种旋转时间。但正如我所说,我想从那些可能真正知道答案的人那里知道。
一条用户评论要求我填写更多关于这件事发生的环境的细节。它在OSX Big Sur 11.4上。使用来自Python.org的Python 3.95的干净安装(没有Homebrew等(。在一辆面包车里从Pycharm里面跑。我希望这有助于加深对局势的理解。
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' %
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
#time.sleep(0.5*random.random()) # <--- time.sleep() commented out
return a * b
def plus(a, b):
#time.sleep(0.5*random.random()). # <--- time.sleep() commented out
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
如果这对任何人都有帮助的话,这就是回溯:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
Traceback (most recent call last):
File "<string>", line 1, in <module>
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory
这是一个技术细分。
这是一种比赛条件,主要过程结束,在一些孩子有机会完全启动之前退出。只要孩子完全开始,就有一些机制可以确保他们顺利关闭,但在这段时间之间存在不安全的情况。竞争条件可能非常依赖于系统,因为这取决于操作系统和硬件来调度不同的线程,以及它们完成工作的速度。
以下是流程启动时发生的情况。。。在创建子进程的早期,它会将自己注册到主进程中,这样当主进程退出时,它将是join
ed或terminate
d,具体取决于它是否为守护进程(multiprocessing.util._exit_function
(。在导入多处理时,已向atexit
模块注册了此退出函数。
同样在创建子进程的过程中,会打开一对Pipe
,用于将Process
对象传递给子解释器(其中包括要执行的函数及其参数(。这需要与子级共享2个文件句柄,并且这些文件句柄也被注册为使用atexit
关闭。
在启动阶段,当主进程在子进程有机会从管道中读取所有必要的数据(取消对Process
对象的酸洗(之前退出时,就会出现问题。如果主进程首先关闭管道,然后等待join
的子进程,那么我们就有问题了。子级将继续旋转新的python实例,直到它需要读入包含您的函数和它应该运行的参数的Process
对象。它将尝试从已经关闭的管道中读取,这是一个错误。
如果所有的孩子都有机会完全启动,你永远不会看到这种情况,因为那个管道只用于启动。推迟一段时间,在某种程度上保证所有的孩子都有时间完全启动,这就是解决这个问题的方法。手动调用join
将通过在调用任何atexit
处理程序之前等待子进程来提供这种延迟。此外,任何数量的处理延迟都意味着主线程中的q.get
将不得不等待一段时间,这也给了子线程在关闭之前启动的时间。我从来没能重现你遇到的问题,但据推测,你看到了所有TASKS
的输出("Process-1说mul(19,7(=133"((。只有一个或两个子进程最终完成了所有工作,允许主进程get
所有结果,并在其他子进程完成启动之前完成。
编辑:
关于发生了什么,这个错误是明确的,但我仍然不知道它是如何发生的。。。据我所知,在加入或终止所有active_children
之后,而不是之前通过_run_finalizers(0)
在_exit_function
中调用_run_finalizers()
时,应该关闭文件句柄
第2版:
_run_finalizers
实际上似乎永远不会调用Popen.finalizer
来关闭管道,因为exitpriority
就是None
。我对这里发生的事情很困惑,我想我需要睡一觉…
显然@user2357112supportsMonica走在了正确的轨道上。如果您在退出程序之前加入进程,它将完全解决问题。此外,@Aaron的回答非常清楚为什么这样可以解决问题!
我按照建议添加了以下代码,它完全解决了在其中使用time.sleep()
的需要。
首先,我收集了启动时的所有流程:
processes: list[Process] = []
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue))
p.start()
processes.append(p)
然后在项目结束时,我加入了他们,如下所示:
# Join the processes
for p in processes:
p.join()
完全解决了问题。谢谢你的建议。