在近乎实时的python中处理套接字数据



我有多达30个节点,每个节点能够每秒发送1000条消息。每个消息可以具有256-512字节的数据。每个节点使用唯一的tcp端口进行通信。接收到的每个数据都经过预处理、插入数据库和后处理。

以下是我尝试过的观察方法:-

案例1.使用asyncio处理收到的数据。

async def process_packets(reader, writer, db):
while True:
data = reader.read(4096)
data = pre_process(data)
save_in_db(data)
post_process(data)
writer.close()

观察:-对于单个数据包,处理通常需要10-20秒。但随着数据包频率的增加,tcp缓冲开始发生,即对reader.read((的单个调用会得到多个数据包。这增加了对当前节点以及其他节点的处理。

案例2.使用异步将数据推送到队列中,工作线程使用该队列。

async def process_packets(reader, writer, q):
while True:
data = reader.read(4096)
q.put(data)
writer.close()
def worker_thread(q, db):
while True:
data = q.get()
data = pre_process(data)
save_in_db(data)
post_process(data)

观察:-由于在接收数据包时不进行任何处理,所有节点都能够尽快将数据放入队列。这个问题出现在工作线程中,随着时间的推移,q.get((变得非常慢。

案例3.为每个节点创建套接字服务器线程

def server_thread(port, db):
s = socket.socket()
s.bind()
s.listen(1)
while True:
(conn, addr) = s.accept()
while True:
try:
data = conn.recv(4096)
except Exception:
conn.close()
break
data = pre_process(data)
save_in_db(data)
post_process(data)

观察:这种情况的优点是每个节点都有专门的线程来接收和处理数据,这样其他线程就不会受到影响。但是这里我面对的是socket.recv((返回的多个数据包。这增加了处理时间。

我需要一种方法来处理来自这些节点的数据,让应用程序全天候运行,不停机。

OS=Ubuntu20.04-lts
系统=intel i3第8代,8GB内存,4核

但这里我面对的是socket.recv((返回的多个数据包

TCP是一个字节流,即在该级别上没有数据包。您可能是指应用程序级别的消息。您的代码必须能够处理多个或部分应用程序消息本身,因为TCP本身不提供消息语法。虽然您在足够快的阅读速度时似乎只收到完整的消息,但这并不能保证,最终您的应用程序可能会短暂停滞(由于日程安排(,消息会累积。

处理从socket.recv()返回的多个消息甚至是一种优势。一次读取多条消息意味着单个系统调用返回更多的应用程序数据,这提高了应用程序的效率(相同工作量所需的系统调用更少(。因此,最好在一个recv中尽可能多地阅读,而不是希望只收到一条消息。

至于另一种设计:每节点一个线程的最后一种方法扩展得最好,因为在这种情况下,工作(以及负载(分布在多个CPU核心上。其他方法只使用单个CPU内核。但是,没有一种方法能够保证您的特定系统能够处理那么多数据。它们的不同之处仅在于它们对底层系统提供的资源的利用程度。

最新更新