处理以换行符结尾的套接字数据



处理需要 vardata以换行符结尾的套接字连接的最佳方法是什么n? 我正在使用下面的代码,但有时tcp数据包会被分块,并且需要很长时间才能匹配data.endswith("n")。 我还尝试了其他方法,例如如果最后一行不以n结尾,则保存最后一行并将其附加到下一个循环的data。但这也不起作用,因为多个数据包被分块,并且第 1 部分和第 2 部分不匹配。 我无法控制另一端,它基本上发送多行以rn结尾。

欢迎任何建议,因为我对套接字连接没有太多了解。

def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
while True:
data += s.recv(buffer_size)
if not data.endswith("n"):
continue
lines = data.split("n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = ""

">正常"data示例:

line1rn
line2rn
line3rn

分块data示例:

line1rn
line2rn
lin

如果你有一个想要作为行处理的原始输入,io模块就是你的朋友,因为它将执行行中数据包的低级组装。

您可以使用:

class SocketIO(io.RawIOBase):
def __init__(self, sock):
self.sock = sock
def read(self, sz=-1):
if (sz == -1): sz=0x7FFFFFFF
return self.sock.recv(sz)
def seekable(self):
return False

它比endswith('n')更健壮,因为如果一个数据包包含嵌入式换行符('abncd'),io模块将正确处理它。您的代码可能变为:

def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
fd = SocketIO(s)  # fd can be used as an input file object
for line in fd:
if should_be_rejected_by_filter(line): continue # do not know what filter does...
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()

使用 socket.socket.makefile() 将套接字包装在包含文本 I/O 的类中。 它处理缓冲、字节和字符串之间的转换,并允许您遍历行。 请记住刷新所有写入。

例:

#!/usr/bin/env python3
import socket, threading, time

def client(addr):
with socket.create_connection(addr) as conn:
conn.sendall(b'aaa')
time.sleep(1)
conn.sendall(b'bbbn')
time.sleep(1)
conn.sendall(b'cccdddn')
time.sleep(1)
conn.sendall(b'eeefff')
time.sleep(1)
conn.sendall(b'n')
conn.shutdown(socket.SHUT_WR)
response = conn.recv(1024)
print('client got %r' % (response,))

def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as listen_socket:
listen_socket.bind(('localhost', 0))
listen_socket.listen(1)
addr = listen_socket.getsockname()
threading.Thread(target=client, args=(addr,)).start()
conn, _addr = listen_socket.accept()
conn_file = conn.makefile(mode='rw', encoding='utf-8')
for request in conn_file:
print('server got %r' % (request,))
conn_file.write('response1n')
conn_file.flush()

if __name__ == '__main__':
main()
$ ./example.py
server got 'aaabbbn'
server got 'cccdddn'
server got 'eeefffn'
client got b'response1n'
$

您是否接受不同的连接?还是一个数据流,由rn分开?

当接受多个连接时,您将等待与s.accept()的连接,然后处理其所有数据。当您拥有所有数据包后,处理其数据,然后等待下一个连接。 然后,您执行的操作取决于每个数据包的结构。 (示例:https://wiki.python.org/moin/TcpCommunication)

相反,如果您正在使用数据流,则可能应该处理在单独线程中找到的每一"行",同时继续使用另一个线程。

编辑:所以,如果我有你的情况正确; 一个连接,数据是由rn分解的字符串,以n结尾。但是,数据与您期望的数据不符,而是无限循环等待n

据我了解,套接字接口以空数据结果结尾。所以最后一个缓冲区可能以n结束,但随后只是继续获取None对象,试图找到另一个n

相反,请尝试添加以下内容:

if not data:
break

完整代码:

def receive_bar_updates(s):
global all_bars
data = ''
buffer_size = 4096
while True:
data += s.recv(buffer_size)
if not data:
break
if not data.endswith("n"):
continue
lines = data.split("n")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = ""

编辑2:糟糕,错误的代码

我还没有测试过这段代码,但它应该可以工作:

def receive_bar_updates(s):
global all_bars
data = ''
buf = ''
buffer_size = 4096
while True:
if not "rn" in data:  # skip recv if we already have another line buffered.
data += s.recv(buffer_size)
if not "rn" in data:
continue
i = data.rfind("rn")
data, buf = data[:i+2], data[i+2:]
lines = data.split("rn")
lines = filter(None, lines)
for line in lines:
if line.startswith("BH") or line.startswith("BC"):
symbol = str(line.split(",")[1])
all_bars[symbol].append(line)
y = Thread(target=proccess_bars, kwargs={'symbol': symbol})
y.start()
data = buf

编辑:忘了提,我只修改了接收数据的代码,我不知道函数的其余部分(从lines = data.split("n")开始)是做什么用的。

编辑 2:现在使用"\r"代替""作为换行符。

编辑 3:修复了一个问题。

您基本上似乎想从套接字中读取行。也许您最好不要使用低级recv调用,而只是使用sock.makefile()并将结果视为常规文件,您可以在其中读取行:from line in sfile: ...

这就留下了延迟/块问题。这很可能是由发送端的 Nagle 算法引起的。尝试禁用它:

sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

最新更新