多线程 Python 服务器,在线程完成时退出



假设你有一个相当基本的客户端/服务器代码,其中每个客户端创建三个线程,多个客户端可以同时连接。我希望服务器等待传入连接,一旦它开始获得连接,就运行直到没有更多的线程运行,然后退出。代码类似于下面。(即,我希望服务器"永远服务",而不是服务器"永远服务",我希望它在所有线程完成后退出)。

编辑:我希望服务器等待传入连接。连接开始后,它应该继续接受连接,直到没有线程仍在运行,然后退出。这些连接将有些零星。

import socket
import threading
# Our thread class:
class ClientThread ( threading.Thread ):
   # Override Thread's __init__ method to accept the parameters needed:
   def __init__ ( self, channel, details ):
      self.channel = channel
      self.details = details
      threading.Thread.__init__ ( self )
   def run ( self ):
      print 'Received connection:', self.details [ 0 ]
      self.channel.send ( 'hello from server' )
      for x in xrange ( 10 ):
         print self.channel.recv ( 1024 )
      self.channel.close()
      print 'Closed connection:', self.details [ 0 ]
# Set up the server:
server = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
server.bind ( ( '', 2727 ) )
server.listen ( 5 )
# Have the server serve "forever":
while True:
   channel, details = server.accept()
   ClientThread ( channel, details ).start()

根据您的评论,您要查找的是在第一条连接到服务器后开始计算连接数,并在没有更多现有连接时终止服务器。

当前无限 while 循环的一个直接问题是每个accept()上的块。因此,无论如何,它将始终等待另一个连接。您必须从其他线程中断它才能使其退出该循环。但另一种解决方案是使事件循环更大,接受新连接的行为只是其中的一部分。循环还应检查退出条件。

此示例只是一种可能的方法。它利用 Queue.Queue 来协调工作计数器。

import socket
import threading
import select
from Queue import Queue
class ClientThread ( threading.Thread ):
   def __init__ ( self, channel, details, queue=None ):
      self.channel = channel
      self.details = details
      self.queue = queue
      threading.Thread.__init__ ( self )
   def run ( self ):
      if self.queue:
         self.queue.put(1)
      print 'Received connection:', self.details [ 0 ]
      self.channel.send ( 'hello from server' )
      for x in xrange ( 10 ):
         print self.channel.recv ( 1024 )
      self.channel.close()
      print 'Closed connection:', self.details [ 0 ]
      if self.queue:
         self.queue.get_nowait()
# Set up the server:
server = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
server.bind ( ( '', 2727 ) )
server.listen ( 5 )
rlist = [server]
work_queue = Queue()
def accept_client():
   channel, details = server.accept()
   ClientThread ( channel, details, work_queue ).start() 
accept_client()
while not work_queue.empty():
   server_ready, _, _ = select.select(rlist,[],[], .25)
   if server in server_ready:
      accept_client()
print "Shutting down"
server.close()
print "Exiting"

我们使用 select.select 作为检测服务器套接字活动的一种简单方法,但也有超时。如果服务器准备就绪,则我们接受新连接。如果它达到 .25 秒超时,我们只需再次循环并等待。

您将看到我们创建了一个队列并不断检查它是否为空。队列将传递到每个线程中。当线程启动时,它会将一些工作记录到队列中。数据是任意的。只是一个旗帜。线程完成后,它将从队列中清除该项。结果是,在收到第一个连接后,队列不再为空,循环将继续运行。如果在任何时候队列变为空(因为所有当前线程都已完成),循环将中断,服务器将关闭。

如果从while循环中break,则进程将等到所有ClientThreads退出,然后终止。

这将起作用,因为客户端线程是非守护程序。有关此内容的更多详细信息,请参阅threading.Thread.daemon

最新更新