我正在尝试同时在两个内核上运行两个Python函数。每个进程运行一个很长的循环(理论上是无限循环)。重要的是它们同时保持同步,从长远来看,即使是最轻微的延迟也会导致问题。
我认为我的问题是我像这样串行运行它们
# define the processes and assign them functions
first_process = multiprocessing.Process(name='p1', target='first_function')
second_process = multiprocessing.Process(name='p2', target='second_function')
# start the processes
first_process.start()
second_process.start()
我在每个函数的开头打印time.time()
来测量时差。输出是:
first function time: 1553812298.9244068
second function time: 1553812298.9254067
相差0.0009999275207519531
秒。如前所述,从长远来看,这种差异将对产生重大影响。
综上所述,如何同时在两个不同的内核上运行两个函数?如果Python不能做到这一点,我应该看看还有什么其他选项?
你所要求的并不是普通操作系统应该提供的。你有操作系统调度干扰,内核迁移,通过CPU热力学改变时钟速度,不同的缓存命中和未命中等等。可以提高进程优先级并将进程固定到某些内核(为此查看 psutil),但您不太可能从中看到稳定的改进。您的操作系统通常比这里做得更好。
对于非常困难的实时限制,您必须研究RTOS。此外,您必须选择一种中级语言(例如C/C++),它允许细粒度的内存管理(减少代价高昂的CPU缓存未命中)。无论如何,您可能要求以不同的方式(XY问题)做一些事情,所以当我继续向您展示如何获得一些同步时,不要理解为对你真正尝试解决的任何问题的整个方法的认可。
这里选择的武器是multiprocessing.Barrier
.这是一个同步原语,它允许指定许多需要在屏障实例上调用.wait()
的执行程序(线程/进程)。当指定数量的执行程序调用了wait()
时,屏障会同时释放所有等待的执行程序。这样,所有执行程序都可以在此类屏障操作上同步。
请注意,一个这样的操作不足以满足您的要求。我前面提到的操作系统因素总是会重新带来混乱,CPU 时间将再次偏离该同步点。这意味着您必须一次又一次地以一定的间隔重复同步。当然,这会花费您一些吞吐量。同步间隔越短,平均发散越小。
下面可以看到实现该技术的两个函数。syncstart_foo
只同步一次(如@blhsing的答案),sync_foo
每sync_interval
迭代一次。进行所有迭代后,函数将time.time()
返回到父级,在那里计算时间增量。
import time
from multiprocessing import Process, Barrier, Queue
def syncstart_foo(outqueue, barrier, n_iter):
barrier.wait() # synchronize only once at start
for _ in range(int(n_iter)):
pass # do stuff
outqueue.put(time.time())
def sync_foo(outqueue, barrier, n_iter, sync_interval):
for i in range(int(n_iter)):
if i % sync_interval == 0: # will sync first time for i==0
barrier.wait()
# do stuff
outqueue.put(time.time())
用于运行基准测试的帮助程序函数:
def test_sync():
"""Run test for `sync_foo`."""
special_args = (SYNC_INTERVAL,)
_run_test(sync_foo, special_args)
def test_syncstart():
"""Run test for `syncstart_foo`."""
_run_test(syncstart_foo)
def _run_test(f, special_args=None):
outqueue = Queue()
barrier = Barrier(N_WORKERS)
args = (outqueue, barrier, N_ITER)
if special_args:
args += special_args
pool = [Process(target=f, args=args) for _ in range(N_WORKERS)]
print(f'starting test for {f.__name__}')
for p in pool:
p.start()
results = [outqueue.get() for _ in range(N_WORKERS)]
for p in pool:
p.join()
print(f"delta: {(abs(results[1] - results[0])) * 1e3:>{6}.{2}f} ms")
print("-" * 60)
主条目:
if __name__ == '__main__':
N_WORKERS = 2
N_ITER = 50e6 # 1e6 == 1M
SYNC_INTERVAL = 250_000 # synchronize every x iterations
for _ in range(5):
test_syncstart()
test_sync()
示例输出:
starting test for syncstart_foo
delta: 28.90 ms
------------------------------------------------------------
starting test for sync_foo
delta: 1.38 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 70.33 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.33 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 4.45 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.17 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 168.80 ms
------------------------------------------------------------
starting test for sync_foo
delta: 0.30 ms
------------------------------------------------------------
starting test for syncstart_foo
delta: 79.42 ms
------------------------------------------------------------
starting test for sync_foo
delta: 1.24 ms
------------------------------------------------------------
Process finished with exit code 0
您可以看到,像syncstart_foo
一样同步一次是不够的。
您可以为每个进程指定一个multiprocessing.Queue
对象,并在进程的函数开始时,使用multiprocessing.Queue.put
将一个项目放入另一个进程的队列中,然后立即尝试使用multiprocessing.Queue.get
取消其自己的队列。由于multiprocessing.Queue.get
阻塞,直到队列中有项目,这有效地同步了两个进程:
import multiprocessing
import time
def func(queue_self, queue_other):
queue_other.put(None)
queue_self.get()
print(time.time())
q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=func, args=(q1, q2))
p2 = multiprocessing.Process(target=func, args=(q2, q1))
if __name__ == '__main__':
p1.start()
p2.start()
示例输出:
1553814412.7520192
1553814412.7520192