处理需要 vardata
以换行符结尾的套接字连接的最佳方法是什么n
? 我正在使用下面的代码,但有时tcp
数据包会被分块,并且需要很长时间才能匹配data.endswith("n")
。 我还尝试了其他方法,例如如果最后一行不以n
结尾,则保存最后一行并将其附加到下一个循环的data
。但这也不起作用,因为多个数据包被分块,并且第 1 部分和第 2 部分不匹配。 我无法控制另一端,它基本上发送多行以rn
结尾。
欢迎任何建议,因为我对套接字连接没有太多了解。
def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
while True:
data += s.recv(buffer_size)
if not data.endswith("n"):
continue
lines = data.split("n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = ""
">正常"data
示例:
line1rn
line2rn
line3rn
分块data
示例:
line1rn
line2rn
lin
如果你有一个想要作为行处理的原始输入,io模块就是你的朋友,因为它将执行行中数据包的低级组装。
您可以使用:
class SocketIO(io.RawIOBase):
def __init__(self, sock):
self.sock = sock
def read(self, sz=-1):
if (sz == -1): sz=0x7FFFFFFF
return self.sock.recv(sz)
def seekable(self):
return False
它比endswith('n')
更健壮,因为如果一个数据包包含嵌入式换行符('abncd'
),io模块将正确处理它。您的代码可能变为:
def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
fd = SocketIO(s) # fd can be used as an input file object
for line in fd:
if should_be_rejected_by_filter(line): continue # do not know what filter does...
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
使用 socket.socket.makefile() 将套接字包装在包含文本 I/O 的类中。 它处理缓冲、字节和字符串之间的转换,并允许您遍历行。 请记住刷新所有写入。
例:
#!/usr/bin/env python3
import socket, threading, time
def client(addr):
with socket.create_connection(addr) as conn:
conn.sendall(b'aaa')
time.sleep(1)
conn.sendall(b'bbbn')
time.sleep(1)
conn.sendall(b'cccdddn')
time.sleep(1)
conn.sendall(b'eeefff')
time.sleep(1)
conn.sendall(b'n')
conn.shutdown(socket.SHUT_WR)
response = conn.recv(1024)
print('client got %r' % (response,))
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as listen_socket:
listen_socket.bind(('localhost', 0))
listen_socket.listen(1)
addr = listen_socket.getsockname()
threading.Thread(target=client, args=(addr,)).start()
conn, _addr = listen_socket.accept()
conn_file = conn.makefile(mode='rw', encoding='utf-8')
for request in conn_file:
print('server got %r' % (request,))
conn_file.write('response1n')
conn_file.flush()
if __name__ == '__main__':
main()
$ ./example.py
server got 'aaabbbn'
server got 'cccdddn'
server got 'eeefffn'
client got b'response1n'
$
您是否接受不同的连接?还是一个数据流,由rn
分开?
当接受多个连接时,您将等待与s.accept()
的连接,然后处理其所有数据。当您拥有所有数据包后,处理其数据,然后等待下一个连接。 然后,您执行的操作取决于每个数据包的结构。 (示例:https://wiki.python.org/moin/TcpCommunication)
相反,如果您正在使用数据流,则可能应该处理在单独线程中找到的每一"行",同时继续使用另一个线程。
编辑:所以,如果我有你的情况正确; 一个连接,数据是由rn
分解的字符串,以n
结尾。但是,数据与您期望的数据不符,而是无限循环等待n
。
据我了解,套接字接口以空数据结果结尾。所以最后一个缓冲区可能以n
结束,但随后只是继续获取None
对象,试图找到另一个n
。
相反,请尝试添加以下内容:
if not data:
break
完整代码:
def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
while True:
data += s.recv(buffer_size)
if not data:
break
if not data.endswith("n"):
continue
lines = data.split("n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = ""
编辑2:糟糕,错误的代码
我还没有测试过这段代码,但它应该可以工作:
def receive_bar_updates(s):
global all_bars
data = ''
buf = ''
buffer_size = 4096
while True:
if not "rn" in data: # skip recv if we already have another line buffered.
data += s.recv(buffer_size)
if not "rn" in data:
continue
i = data.rfind("rn")
data, buf = data[:i+2], data[i+2:]
lines = data.split("rn")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = buf
编辑:忘了提,我只修改了接收数据的代码,我不知道函数的其余部分(从lines = data.split("n")
开始)是做什么用的。
编辑 2:现在使用"\r"代替""作为换行符。
编辑 3:修复了一个问题。
您基本上似乎想从套接字中读取行。也许您最好不要使用低级recv
调用,而只是使用sock.makefile()
并将结果视为常规文件,您可以在其中读取行:from line in sfile: ...
这就留下了延迟/块问题。这很可能是由发送端的 Nagle 算法引起的。尝试禁用它:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)