我目前正在用Python 3.x编写一个图像处理程序,该程序需要以低延迟(<60ms)实时处理帧(30FPS)。我有一个父进程,它读取帧并通过SharedMemory对象将它们发送到多个子进程。子进程完成的计算受CPU限制,在30FPS的情况下不可能在单个内核上运行所有这些计算。但由于它们彼此独立工作,我决定将它们作为单独的流程运行。
目前,我正在使用Pipes向子进程发送命令,最重要的是在帧更新时通知它们。在测量父级的send()命令和子级的recv()命令之间的时间时,延迟总是>100ms。我用了time.time_ns()。
这是一个问题,因为输出馈送现在将总是滞后>100ms+所有子级完成处理所花费的时间(另外20-30ms+所有send()函数之间的延迟)。
该应用程序旨在用于体育直播,因此不能引入如此高的延迟。所以我有两个问题:
-
Python中的管道真的那么慢吗?或者我对它们的执行有问题。(注意:我已经在英特尔i5第九代和苹果M1上测试了延迟)
-
如果Pipes确实这么慢,我在Python中还有其他选项吗?除了求助于某种形式的插座之外?
谢谢。
编辑:
我在这里添加了用于测试管道延迟的代码。
import multiprocessing as mp
import time
def proc(child_conn):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
child_conn.close()
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
刚刚为您编写了一个可能的解决方案,使用多处理对象Process和Queue。
我测量了它的吞吐量速度,处理一个几乎什么都不做的任务平均需要150 mcs
(微秒)。处理只需要从任务中获取整数,然后将其加1并发回。我认为150微秒的延迟应该完全足够你处理30 FPS。
使用队列而不是管道,因为我认为它更适合多任务处理。此外,如果您的时间测量是精确的,那么Queue也比Pipe快660x
倍(150微秒,而延迟为100毫秒)。
您可以注意到,处理循环分批发送任务,这意味着它首先向所有进程发送许多任务,然后才收集所有发送和处理的任务。与一次只发送一个任务,然后收集很少的结果相比,这种批处理使处理变得平稳。
如果您将任务发送到进程,然后在单独的轻量级线程中异步收集结果,那就更好了。这将防止您阻止等待完成任务的最慢进程。
进程通过向它们发送None
任务来发出完成和退出的信号。
在线试用!
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
# Send tasks
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
# Receive tasks results
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
# Send finish signals to processes
for proc in procs:
proc['in_q'].put(None)
# Join processes (wait for exit)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
输出:
Processing speed: 150.7 mcs per task
还测量了一次只向所有进程发送一个任务(而不是一次1000个任务)和一次接收一个任务的时间。在这种情况下,延迟为460 mcs
(微秒)。因此,您可以将其视为在使用Queue的最坏情况下,Queue的纯延迟为460 mcs(460 mcs包括send和recv)。
我已经取了您的示例片段,并对其进行了一些修改,使其使用Queue而不是Pipe,得到了0.1 ms
延迟。
请注意,我在一个循环中这样做了5次,因为第一次或第二次尝试初始化了一些与队列相关的东西。
在线试用!
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
输出:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
此外,在循环中多次运行您的示例会使第二次和其他发送/接收迭代比第一次快得多。
由于懒洋洋地初始化资源,第一次非常慢。大多数算法都是惰性初始化的,这意味着它们只在第一次调用时分配所有需要的资源。当根本不使用算法时,这是为了防止不必要的分配。另一方面,这会使第一次调用变得更慢,因此您必须进行少量的第一次空调用来预热Lazy算法。
在线试用!
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
输出:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
以下程序通过管道发送一个简单对象100万次,并测量总运行时间(以秒为单位)和平均发送时间(以毫秒为单位)。我在一个相当旧的Windows桌面上运行,一个英特尔(R)酷睿(TM)i7-4790 CPU@3.60 GHz:
from multiprocessing import Pipe, Process
import time
class Message:
def __init__(self, text):
self.text = text
N = 1_000_000
def worker(recv_connection):
for _ in range(N):
msg = recv_connection.recv()
def main():
recv_connection, send_connection = Pipe(duplex=False)
p = Process(target=worker, args=(recv_connection,))
p.start()
msg = Message('dummy')
start_time = time.time_ns()
for _ in range(N):
send_connection.send(msg)
p.join()
elapsed = time.time_ns() - start_time
print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')
if __name__ == '__main__':
main()
打印:
Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.
这比您正在实现的速度(100ms.)快10000倍,所以我只能得出结论,这一定是您通过管道发送的对象的复杂性
更新
您确实希望使用多处理,但我建议使用多处理池,特别是与imap
方法一起使用的multiprocessing.pool.Pool
实例。这将允许您使用生成器函数来生成下一个要处理的帧,并将其提交到池中进行处理,并在处理后的帧可用时将其返回到主进程,按提交帧的顺序返回。以下概述了基本思想:
from multiprocessing import Pool, cpu_count
import time
def process_frame(frame):
# return processed frame
time.sleep(.1)
return frame.upper()
def generate_frames_for_processing():
for i in range(100):
time.sleep(.033)
yield f'msg{i}'
def main():
# Leave a processor for the main process:
pool = Pool(cpu_count() - 1)
start_time = time.time()
# get processed results as they are returned in order of being processed:
for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
# Do something with returned processed frame
# These will be in the same order as the frames are submitted
...
print(processed_frame)
pool.close()
pool.join()
print('Elapsed:', time.time() - start_time)
if __name__ == '__main__':
main()
打印:
MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282
您可以在imap
调用上指定chunksize参数,但可能不希望这样做。有关详细信息,请参阅文档。