Python:存储并删除线程到/从列表中



我想实现一个流服务器,该服务器将数据流发送给所有已连接的客户端。多个客户端应该能够连接并断开服务器,以便以不同的方式处理数据。

每个客户端都由专用的客户端读取,该端线程sub-classes线程并包含要发送给客户端的数据队列(因为客户端可能以不同的速度处理数据,并且由于可能发生的数据爆发,因此客户可能无法处理(。

该程序通过单独的clientHandlerThread来聆听传入的客户端连接。每当客户端连接时,clienthandlerthread都会产生clientthread并将其添加到列表中。

作为一个虚拟示例,主线程每秒会增加整数,并将其推向所有客户端线程队列。

每10个增量,都会打印客户端队列中的项目数。


现在到我的问题:

当客户端断开连接时,线程将终止并且不再发送数据,但是,客户端的对象仍保留在客户端handlerthreads列表中,并将项目不断地将其推向其队列。

因此,我正在寻找一种(1(一种在客户端断开连接时从列表中删除客户端对象的方法,(2(一种比列表或(3(不同(更好的(更好的方法来监视客户端的方法建筑以归档我的目标。

非常感谢!


服务器

import socket
import time
from threading import Thread
from queue import Queue

class ClientThread(Thread):
    def __init__(self, conn, client_addr):
        Thread.__init__(self)
        self.queue = Queue()
        self.conn = conn
        self.client_addr = client_addr
    def run(self):
        print('Client connected')
        while True:
            try:
                self.conn.sendall(self.queue.get().encode('utf-8'))
                time.sleep(1)
            except BrokenPipeError:
                print('Client disconnected')
                break

class ClientHandlerThread(Thread):
    def __init__(self):
        Thread.__init__(self, daemon = True)
        self.clients = list()
    def push_item(self, item):
        for client in self.clients:
            client.queue.put(str(i))
    def run(self):
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
            s.bind('./socket')
            s.listen()
            i = 1
            while True:
                conn, client_addr = s.accept()
                client = ClientThread(conn, client_addr)
                client.start()
                self.clients.append(client)
                i += 1

if __name__ == '__main__':
    client_handler = ClientHandlerThread()
    client_handler.start()
    i = 1
    while True:
        client_handler.push_item(str(i))
        if i % 10 == 0:
            print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')

        i += 1
        time.sleep(1)

客户端:

import socket
if __name__ == '__main__':
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
        s.connect('./socket')
        print('Connected to server')
        while True:
            data = s.recv(1024)
            if not data:
                print('Disconnected from server')
                break
            print(data.decode('utf-8'))

注意您可能应该在aiohttp之类的内容上阅读以获取答案的更多可扩展版本。


对于您的问题,您可以进行一些更改以实现这一目标:

首先,更改ClientThread的构造函数:

class ClientThread(Thread):
    def __init__(self, client_handler, conn, client_addr):
        self.client_handler = client_handler
        self.running = True
        ...

当处理程序创建对象时,它应将self传递给其为client_handler

run method中,使用

   def run(self):
        while True:
            ...
        self.running = False
        self.client_handler.purge() 

也就是说,它标记为不运行,并称为处理程序的purge方法。这可以写为

class ClientHandlerThread(Thread):
    ...
    def purge(self):
        self.clients = [c for c in self.clients if c.running]

最新更新