我想实现一个流服务器,该服务器将数据流发送给所有已连接的客户端。多个客户端应该能够连接并断开服务器,以便以不同的方式处理数据。
每个客户端都由专用的客户端读取,该端线程sub-classes线程并包含要发送给客户端的数据队列(因为客户端可能以不同的速度处理数据,并且由于可能发生的数据爆发,因此客户可能无法处理(。
该程序通过单独的clientHandlerThread来聆听传入的客户端连接。每当客户端连接时,clienthandlerthread都会产生clientthread并将其添加到列表中。
作为一个虚拟示例,主线程每秒会增加整数,并将其推向所有客户端线程队列。
每10个增量,都会打印客户端队列中的项目数。
现在到我的问题:
当客户端断开连接时,线程将终止并且不再发送数据,但是,客户端的对象仍保留在客户端handlerthreads列表中,并将项目不断地将其推向其队列。
因此,我正在寻找一种(1(一种在客户端断开连接时从列表中删除客户端对象的方法,(2(一种比列表或(3(不同(更好的(更好的方法来监视客户端的方法建筑以归档我的目标。
非常感谢!
服务器
import socket
import time
from threading import Thread
from queue import Queue
class ClientThread(Thread):
def __init__(self, conn, client_addr):
Thread.__init__(self)
self.queue = Queue()
self.conn = conn
self.client_addr = client_addr
def run(self):
print('Client connected')
while True:
try:
self.conn.sendall(self.queue.get().encode('utf-8'))
time.sleep(1)
except BrokenPipeError:
print('Client disconnected')
break
class ClientHandlerThread(Thread):
def __init__(self):
Thread.__init__(self, daemon = True)
self.clients = list()
def push_item(self, item):
for client in self.clients:
client.queue.put(str(i))
def run(self):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind('./socket')
s.listen()
i = 1
while True:
conn, client_addr = s.accept()
client = ClientThread(conn, client_addr)
client.start()
self.clients.append(client)
i += 1
if __name__ == '__main__':
client_handler = ClientHandlerThread()
client_handler.start()
i = 1
while True:
client_handler.push_item(str(i))
if i % 10 == 0:
print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')
i += 1
time.sleep(1)
客户端:
import socket
if __name__ == '__main__':
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect('./socket')
print('Connected to server')
while True:
data = s.recv(1024)
if not data:
print('Disconnected from server')
break
print(data.decode('utf-8'))
注意您可能应该在aiohttp
之类的内容上阅读以获取答案的更多可扩展版本。
对于您的问题,您可以进行一些更改以实现这一目标:
首先,更改ClientThread
的构造函数:
class ClientThread(Thread):
def __init__(self, client_handler, conn, client_addr):
self.client_handler = client_handler
self.running = True
...
当处理程序创建对象时,它应将self
传递给其为client_handler
。
在run method
中,使用
def run(self):
while True:
...
self.running = False
self.client_handler.purge()
也就是说,它标记为不运行,并称为处理程序的purge
方法。这可以写为
class ClientHandlerThread(Thread):
...
def purge(self):
self.clients = [c for c in self.clients if c.running]