Python 3.4多处理队列比Pipe快,出乎意料



我正在做一个音频播放器,它从udp套接字接收样本,一切都很好。但当我实现Lost Concealment算法时,玩家未能以预期的速率保持沉默(每10ms发送一个160字节的列表)。

当用pyaudio播放音频时,使用阻塞调用write来播放一些样本,我注意到它在样本的持续时间内平均被阻塞。所以我创建了一个新的专用流程来播放样本。

主进程处理音频的输出流,并使用多处理将结果发送给该进程。管我决定使用多处理。管道,因为它应该比其他方式更快。

不幸的是,当我在虚拟机上运行该程序时,比特率是我在快速PC上获得的比特率的一半,这并没有达到目标比特率。

经过一些测试,我得出结论,导致延迟的是Pipe的函数send

我做了一个简单的基准测试脚本(见下文),以了解向流程传输的各种方法之间的差异。该脚本持续发送[b'x00'*160] 5秒,并统计字节对象总共发送了多少字节。我测试了以下发送方法:"不发送"、多处理。管道,多处理。队列,多处理。经理,多处理。监听器/客户端,最后是socket.socket:

我的"快速"PC运行窗口7 x64的结果:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

VirtualBox的VM来宾运行Windows 7 x64,主机运行Windows 7 x64:的结果

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

使用的脚本:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time
FS = "{:<15}:{:>15}"

def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'x00'*160
        lst = [data]
        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))

def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break

def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))

def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break

def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()
    print(FS.format("test_queue", sent))

def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break

def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()
        print(FS.format("test_manager", sent))

def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break

def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()
    print(FS.format("test_socket", sent))

def named_pipe_void():
    addr = '\\.\pipe\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break

def test_named_pipe():
    addr = '\\.\pipe\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()
    print(FS.format("test_named_pipe", sent))

if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

问题

  • 如果"队列"使用"管道",在这种情况下它比"管道"快多少?这与Python多处理-管道与队列的问题相矛盾
  • 如何在发送延迟较低的情况下,将恒定的比特率流从一个进程添加到另一个进程

更新1

在我的程序中,在试用了Queues而不是Pipes之后我得到了巨大的鼓舞

在我的电脑上,使用Pipes我得到了+-16000 B/s,使用Queues我得到了+/-750万B/s。在虚拟机上,我的速度从+-13000 B/s提高到650万B/s。使用Queue代替Pipe大约会增加500倍的字节。

当然,我不会以每秒数百万字节的速度播放,我只会以正常的速率播放声音。(在我的情况下是16000 B/s,与上面的值一致)
但关键是,我可以将速率限制在我想要的范围内,同时仍有时间完成其他计算(如从套接字接收、应用声音算法等)

我不能肯定,但我认为您要处理的问题是同步I/O与异步I/O。我的猜测是,Pipe以某种方式结束了同步,而Queue以异步结束。为什么一方以某种方式违约,另一方以另一种方式违约,这个问题和答案可能会更好地回答:

python管道的同步/异步行为

最新更新