我在编写IO缓冲区和套接字之间传输数据的线程时遇到了问题。让它运行没有任何问题,但不是我想要的方式。下面是代码的草图:
s = socket(...) # some connection
in_buffer = b'' # consumed by other thread
out_buffer = b'' # produced by other thread
while True:
(r, w, x) = select([s], [s], [s])
if r:
in_buffer += s.recv(RECV_LIMIT)
if w:
sent = s.send(out_buffer)
out_buffer = out_buffer[sent:]
if x:
break
这样做的问题是,它在空闲时消耗了一个完整的CPU。原因是套接字在大多数时候是可写的,特别是在空闲的时候。select()
立即返回,什么也不做,再次调用select()
,什么也不做,等等。有一个简单的解决办法,当你没有任何东西可写时,不要检查可写套接字:
... # dito
while True:
if out_buffer:
(r, w, x) = select([s], [s], [s])
else:
(r, w, x) = select([s], [], [s])
... # dito
这是有效的,但它有一个不同的问题:当空闲时,这在select()
上无限阻塞。如果我向输出缓冲区添加一些内容,我需要以某种方式从accept()
调用唤醒线程,但是如何唤醒呢?为了记录,我当前的解决方案稍微改变了评估:
while True:
(r, w, x) = select([s], [s], [s])
if x:
break
elif r:
in_buffer += s.recv(RECV_LIMIT)
elif w:
if out_buffer:
sent = s.send(out_buffer)
out_buffer = out_buffer[sent:]
else:
sleep(0.001)
简而言之,当确实无事可做时,插入延迟。毫秒甚至足以不消耗1%的CPU。类似的方法是对select()
调用使用超时,然后重新检查输出数据是否存在。不过,这两种解决方案都不好,因为两者都可以有效地归结为轮询,而轮询很糟糕。那么,如何在不轮询的情况下编写这样一个可移植的IO线程呢?
注意:一种方法是添加另一个文件描述符,我将在其上创建人工流量,以便从阻塞的select()
调用中唤醒线程。这里的问题是select()
只能移植地在套接字上使用,而不是在管道上。或者,在MS Windows上,我可以将win32事件与套接字的状态更改和另一个事件关联起来以唤醒线程(参见WSAEventSelect),但我也不想在不可移植的WinSock API上编写此代码。
我有点不清楚为什么你需要这个中间人在这里与select
合作——这是你的问题的约束吗?在我看来,如果它是,那么你必须把输出缓冲区作为一个资源,需要准备好读取,甚至在你告诉select你对写入感兴趣之前。
如果你用Queue
s的小字符串交换缓冲区,这似乎会简化得多。这样,就可以有两个线程与套接字交互:
# One Thread consuming the socket
while True:
(r, w, x) = select([s], [], [s])
if r:
in_buffer.put(s.recv(RECV_LIMIT))
if x:
break
# And one Thread writing to the socket
while True:
string = out_buffer.get()
(r, w, x) = select([], [s], [s])
if w:
s.send(string)
if x:
break
这样,生成线程就可以安全地发出信号,表明数据已经准备好写入了。也就是说,select
是一个真正的低级接口(就这一点而言,socket
也是如此),我将考虑使用一个知道更多花式功能的抽象。我偏爱gevent,但它当然是面向io绑定的应用程序的,如果您是cpu绑定的应用程序,它可能不太适合。在那里,生产者和消费者可以有效地直接与套接字交互,不需要这个中间人:
import gevent
from gevent import socket, sleep
def producer(sock):
# We'll spit out some bytes every so often
while True:
sock.send('Hello from the producer!')
sleep(0.01)
def consumer(sock):
# We'll read some in as long as we can
buffer = ''
while True:
buffer += sock.recv(100)
# If the buffer can be consumed, we'll consume it and reset
if len(buffer) > 500:
print 'Consuming buffer: %s' % buffer
buffer = ''
def client(sock):
# This will emulate a client that prints what it recieves, but always
# sends the same message
while True:
sock.send('Hello from the client!')
print sock.recv(100)
# Run this to get the server going
listener = socket.socket()
listener.bind(('127.0.0.1', 5001))
listener.listen(5)
(sock, addr) = listener.accept()
gevent.joinall([
gevent.spawn(producer, sock),
gevent.spawn(consumer, sock)
])
# Run this to get a client going
connector = socket.socket()
connector.connect(('127.0.0.1', 5001))
gevent.joinall([
gevent.spawn(client, connector)
])