不轮询的基于选择的套接字循环



我在编写IO缓冲区和套接字之间传输数据的线程时遇到了问题。让它运行没有任何问题,但不是我想要的方式。下面是代码的草图:

s = socket(...) # some connection
in_buffer = b'' # consumed by other thread
out_buffer = b'' # produced by other thread
while True:
    (r, w, x) = select([s], [s], [s])
    if r:
        in_buffer += s.recv(RECV_LIMIT)
    if w:
        sent = s.send(out_buffer)
        out_buffer = out_buffer[sent:]
    if x:
        break

这样做的问题是,它在空闲时消耗了一个完整的CPU。原因是套接字在大多数时候是可写的,特别是在空闲的时候。select()立即返回,什么也不做,再次调用select(),什么也不做,等等。有一个简单的解决办法,当你没有任何东西可写时,不要检查可写套接字:

... # dito
while True:
    if out_buffer:
        (r, w, x) = select([s], [s], [s])
    else:
        (r, w, x) = select([s], [], [s])
    ... # dito

这是有效的,但它有一个不同的问题:当空闲时,这在select()上无限阻塞。如果我向输出缓冲区添加一些内容,我需要以某种方式从accept()调用唤醒线程,但是如何唤醒呢?为了记录,我当前的解决方案稍微改变了评估:

while True:
    (r, w, x) = select([s], [s], [s])
    if x:
        break
    elif r:
        in_buffer += s.recv(RECV_LIMIT)
    elif w:
        if out_buffer:
            sent = s.send(out_buffer)
            out_buffer = out_buffer[sent:]
        else:
            sleep(0.001)

简而言之,当确实无事可做时,插入延迟。毫秒甚至足以不消耗1%的CPU。类似的方法是对select()调用使用超时,然后重新检查输出数据是否存在。不过,这两种解决方案都不好,因为两者都可以有效地归结为轮询,而轮询很糟糕。那么,如何在不轮询的情况下编写这样一个可移植的IO线程呢?

注意:一种方法是添加另一个文件描述符,我将在其上创建人工流量,以便从阻塞的select()调用中唤醒线程。这里的问题是select()只能移植地在套接字上使用,而不是在管道上。或者,在MS Windows上,我可以将win32事件与套接字的状态更改和另一个事件关联起来以唤醒线程(参见WSAEventSelect),但我也不想在不可移植的WinSock API上编写此代码。

我有点不清楚为什么你需要这个中间人在这里与select合作——这是你的问题的约束吗?在我看来,如果它是,那么你必须把输出缓冲区作为一个资源,需要准备好读取,甚至在你告诉select你对写入感兴趣之前。

如果你用Queue s的小字符串交换缓冲区,这似乎会简化得多。这样,就可以有两个线程与套接字交互:

# One Thread consuming the socket
while True:
    (r, w, x) = select([s], [], [s])
    if r:
        in_buffer.put(s.recv(RECV_LIMIT))
    if x:
        break
# And one Thread writing to the socket
while True:
    string = out_buffer.get()
    (r, w, x) = select([], [s], [s])
    if w:
        s.send(string)
    if x:
        break

这样,生成线程就可以安全地发出信号,表明数据已经准备好写入了。也就是说,select是一个真正的低级接口(就这一点而言,socket也是如此),我将考虑使用一个知道更多花式功能的抽象。我偏爱gevent,但它当然是面向io绑定的应用程序的,如果您是cpu绑定的应用程序,它可能不太适合。在那里,生产者和消费者可以有效地直接与套接字交互,不需要这个中间人:

import gevent
from gevent import socket, sleep
def producer(sock):
    # We'll spit out some bytes every so often
    while True:
        sock.send('Hello from the producer!')
        sleep(0.01)
def consumer(sock):
    # We'll read some in as long as we can
    buffer = ''
    while True:
        buffer += sock.recv(100)
        # If the buffer can be consumed, we'll consume it and reset
        if len(buffer) > 500:
            print 'Consuming buffer: %s' % buffer
            buffer = ''
def client(sock):
    # This will emulate a client that prints what it recieves, but always
    # sends the same message
    while True:
        sock.send('Hello from the client!')
        print sock.recv(100)
# Run this to get the server going
listener = socket.socket()
listener.bind(('127.0.0.1', 5001))
listener.listen(5)
(sock, addr) = listener.accept()
gevent.joinall([
    gevent.spawn(producer, sock),
    gevent.spawn(consumer, sock)
])
# Run this to get a client going
connector = socket.socket()
connector.connect(('127.0.0.1', 5001))
gevent.joinall([
    gevent.spawn(client, connector)
])

最新更新