通过websocket发送的文件接收时变得太大



我正在尝试学习Python中的asyncio websockets。我实现了一个websocket服务器,它接收二进制数据并输出到另一台计算机。

问题是,当数据到达另一台计算机时,生成的文件非常大。对于小文件(如2行的。txt文件),它工作得很好,但对于大文件(约5mb及以上),接收计算机中的结果文件为4gb。

我找不到是什么原因造成的。无论我做什么,发送者的文件大小和接收者的文件大小永远不匹配。

一些代码:

FileManager.py

class Manager():
BUFFER_SIZE = 8092
file = None
filesize = None
filename = None
received_file = bytearray()
sent = 0
lock = asyncio.Lock()
secret = None
ws = None
def __init__(self,  secret=None, ws: websockets.WebSocketServerProtocol = None):
self.ws = ws
self.secret = secret
def open_file(self, filename, mode):
self.file = open(filename, mode)
def close_file(self):
self.file.close()
async def chunk_sender(self):
async with self.lock:
self.file.seek(self.sent)
bytes_read = self.file.read(self.BUFFER_SIZE)
await self.ws.send(json.dumps({
"cmd": "send",
"key": self.secret,
"data": bytes_read.decode("utf-8")
}))
self.sent += self.BUFFER_SIZE
async def chunk_receiver(self, binary):
async with self.lock:
self.received_file += binary
self.file.write(self.received_file)
perc = ((len(self.received_file) * 100)/self.filesize)
print("rDownloading file: " + colored(str(round(perc, 2)) + "%", "magenta"), end='', flush=True)

async def start_sending(self):
self.open_file(self.filename, "rb")
spawn = math.ceil(self.filesize / self.BUFFER_SIZE)
tasks = []
for _ in range(spawn):
tasks.append(self.chunk_sender())
pbar = tqdm.tqdm(total=len(tasks), leave=True, mininterval=0)
for process in asyncio.as_completed(tasks):
value = await process
pbar.set_description(value)
pbar.update()

ClientManager.py

import websockets
import json
from termcolor import colored
from classes import File

class Manager:
SERVER_URL = None
filename = None
filesize = 0
secret = None
FileManager = File.Manager()
def __init__(self, SERVER_URL, filename, filesize, secret):
self.SERVER_URL = SERVER_URL
self.filename = filename
self.filesize = filesize
self.secret = secret
# Initialize FileManager
self.FileManager.secret = self.secret
self.FileManager.filesize = self.filesize
self.FileManager.filename = self.filename
async def start_sender(self):
async with websockets.connect(self.SERVER_URL) as ws:
self.FileManager.ws = ws
await ws.send(json.dumps({"cmd": "sender_init", "key": self.secret}))
print("Now in the receiver computer", end=" "), print(
colored("sendpai " + self.secret, "magenta"))
while True:
message = await ws.recv()
deserialized = json.loads(message)
cmd = deserialized["cmd"]
if cmd == "receiver_request":
await self.FileManager.start_sending()
elif cmd == "receiver_init":
await ws.send(json.dumps({"cmd": "file_details", "key": self.secret, "filename": self.filename, "filesize": self.filesize}))
async def start_receiver(self):
async with websockets.connect(self.SERVER_URL) as ws:
self.FileManager.ws = ws
await ws.send(json.dumps({"cmd": "receiver_init", "key": self.secret}))
while True:
message = await ws.recv()
deserialized = json.loads(message)
if "cmd" in deserialized:
cmd = deserialized["cmd"]
if cmd == "send":
if "data" in deserialized:
binary_chunk = bytes(
deserialized["data"], encoding="utf-8")
await self.FileManager.chunk_receiver(binary_chunk)
elif cmd == "file_details":
self.FileManager.filename = deserialized["filename"]
self.FileManager.filesize = deserialized["filesize"]
self.FileManager.open_file("hello", "wb")
await ws.send(json.dumps({"cmd": "receiver_request", "key": self.secret}))
print("[The file is about to be downloaded]")
print(
"filename: " + colored(str(self.FileManager.filename), "green"), end=" ")
print(
"filesize: " + colored(str(self.FileManager.filesize / 1000) + "mb", "yellow"))

Server.py

class Server():
clients = []
clients_lock = threading.Lock()
async def register(self, ws: websockets.WebSocketServerProtocol, key, who) -> None:
with self.clients_lock:
self.clients.append({"key": key, "ws": ws, "who": who})
logging.info(who + f' {ws.remote_address[0]} connects')
async def unregister(self, ws: websockets.WebSocketServerProtocol) -> None:
with self.clients_lock:
for client in self.clients:
if client["ws"] == ws:
del client
logging.info(f'{ws.remote_address[0]} connects')
async def init_event(self, ws: websockets.WebSocketServerProtocol, key: str, who:str) -> None:
await self.register(ws, key, who)
logging.info(f'{ws.remote_address[0]} with key f{key}')
async def receiver_request_event(self, ws: websockets.WebSocketServerProtocol, key: str) -> None:
await self.register(ws, key, "receiver")
for client in self.clients:
if client["key"] == key:
await client["ws"].send(json.dumps({"cmd": "receiver_request"}))
async def send_to_receiver(self, key, message):
for client in self.clients:
if(client["key"] == key and client["who"] == "receiver"):
await client["ws"].send(message)
async def send_to_sender(self, key, message):
for client in self.clients:
if(client["key"] == key and client["who"] == "sender"):
await client["ws"].send(message)
async def ws_handler(self, ws: websockets.WebSocketServerProtocol, uri: str):
try:    
async for message in ws:
deserialized = json.loads(message)
cmd = deserialized["cmd"]
key = deserialized["key"]
if cmd == "sender_init":
await self.init_event(ws, key, "sender")
elif cmd == "receiver_request":
await self.receiver_request_event(ws, key)
elif cmd == "send":
await self.send_to_receiver(key, message)
elif cmd == "receiver_init":
await self.init_event(ws, key, "receiver")
await self.send_to_sender(key, message)
elif cmd == "file_details":
await self.send_to_receiver(key, message)
except websockets.exceptions.ConnectionClosed as e:
logging.info("Connection closed")

我试着调试我的代码在搜索:

  • 发送比需要更多的数据块
  • 服务器向接收端发送双套接字消息
  • 发送数据时可能存在编码问题?

我注意到的一件事是,我需要一个锁在chunk_sender函数中,因为当协程到达时,我从同一个指针读取了很多次,这改进了事情,但仍然存在问题。

提前感谢。

没关系,2个错误:

客户端在服务器事件中被添加到我的列表中两次:

self.receiver_request_event(ws, key)

self.init_event(ws, key, "receiver")

所以,当websocket消息发送时,我收到两次。

还有,我应该每次都写给定的二进制而不是已经发送的字节:

async def chunk_receiver(self, binary):
async with self.lock:
self.received_file += binary
self.file.write(self.received_file)
...

最新更新