我正在做一个音频播放器,它从udp套接字接收样本,一切都很好。但当我实现Lost Concealment算法时,玩家未能以预期的速率保持沉默(每10ms发送一个160字节的列表)。
当用pyaudio播放音频时,使用阻塞调用write来播放一些样本,我注意到它在样本的持续时间内平均被阻塞。所以我创建了一个新的专用流程来播放样本。
主进程处理音频的输出流,并使用多处理将结果发送给该进程。管我决定使用多处理。管道,因为它应该比其他方式更快。
不幸的是,当我在虚拟机上运行该程序时,比特率是我在快速PC上获得的比特率的一半,这并没有达到目标比特率。
经过一些测试,我得出结论,导致延迟的是Pipe的函数send
。
我做了一个简单的基准测试脚本(见下文),以了解向流程传输的各种方法之间的差异。该脚本持续发送[b'x00'*160]
5秒,并统计字节对象总共发送了多少字节。我测试了以下发送方法:"不发送"、多处理。管道,多处理。队列,多处理。经理,多处理。监听器/客户端,最后是socket.socket:
我的"快速"PC运行窗口7 x64的结果:
test_empty : 1516076640
test_pipe : 58155840
test_queue : 233946880
test_manager : 2853440
test_socket : 55696160
test_named_pipe: 58363040
VirtualBox的VM来宾运行Windows 7 x64,主机运行Windows 7 x64:的结果
test_empty : 1462706080
test_pipe : 32444160
test_queue : 204845600
test_manager : 882560
test_socket : 20549280
test_named_pipe: 35387840
使用的脚本:
from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time
FS = "{:<15}:{:>15}"
def test_empty():
s = time.time()
sent = 0
while True:
data = b'x00'*160
lst = [data]
sent += len(data)
if time.time()-s >= 5:
break
print(FS.format("test_empty", sent))
def pipe_void(pipe_in):
while True:
msg = pipe_in.recv()
if msg == []:
break
def test_pipe():
pipe_out, pipe_in = Pipe()
p = Process(target=pipe_void, args=(pipe_in,))
p.start()
s = time.time()
sent = 0
while True:
data = b'x00'*160
lst = [data]
pipe_out.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
pipe_out.send([])
p.join()
print(FS.format("test_pipe", sent))
def queue_void(q):
while True:
msg = q.get()
if msg == []:
break
def test_queue():
q = Queue()
p = Process(target=queue_void, args=(q,))
p.start()
s = time.time()
sent = 0
while True:
data = b'x00'*160
lst = [data]
q.put(lst)
sent += len(data)
if time.time()-s >= 5:
break
q.put([])
p.join()
print(FS.format("test_queue", sent))
def manager_void(l, lock):
msg = None
while True:
with lock:
if len(l) > 0:
msg = l.pop(0)
if msg == []:
break
def test_manager():
with Manager() as manager:
l = manager.list()
lock = manager.Lock()
p = Process(target=manager_void, args=(l, lock))
p.start()
s = time.time()
sent = 0
while True:
data = b'x00'*160
lst = [data]
with lock:
l.append(lst)
sent += len(data)
if time.time()-s >= 5:
break
with lock:
l.append([])
p.join()
print(FS.format("test_manager", sent))
def socket_void():
addr = ('127.0.0.1', 20000)
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_socket():
addr = ('127.0.0.1', 20000)
listener = Listener(addr, "AF_INET")
p = Process(target=socket_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_socket", sent))
def named_pipe_void():
addr = '\\.\pipe\Test'
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_named_pipe():
addr = '\\.\pipe\Test'
listener = Listener(addr, "AF_PIPE")
p = Process(target=named_pipe_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_named_pipe", sent))
if __name__ == "__main__":
test_empty()
test_pipe()
test_queue()
test_manager()
test_socket()
test_named_pipe()
问题
- 如果"队列"使用"管道",在这种情况下它比"管道"快多少?这与Python多处理-管道与队列的问题相矛盾
- 如何在发送延迟较低的情况下,将恒定的比特率流从一个进程添加到另一个进程
更新1
在我的程序中,在试用了Queues而不是Pipes之后我得到了巨大的鼓舞。
在我的电脑上,使用Pipes我得到了+-16000 B/s,使用Queues我得到了+/-750万B/s。在虚拟机上,我的速度从+-13000 B/s提高到650万B/s。使用Queue代替Pipe大约会增加500倍的字节。
当然,我不会以每秒数百万字节的速度播放,我只会以正常的速率播放声音。(在我的情况下是16000 B/s,与上面的值一致)
但关键是,我可以将速率限制在我想要的范围内,同时仍有时间完成其他计算(如从套接字接收、应用声音算法等)
我不能肯定,但我认为您要处理的问题是同步I/O与异步I/O。我的猜测是,Pipe以某种方式结束了同步,而Queue以异步结束。为什么一方以某种方式违约,另一方以另一种方式违约,这个问题和答案可能会更好地回答:
python管道的同步/异步行为