我有一个字典作为类变量。vsw。VswInformation是一个简单的数据类,主要包含字符串。有一个类方法添加了vsw。VswInformation元素,以及获取该字典的另一个类方法。
类别如下:
class Swc:
lock: Lock = Lock()
vsw_infos: Dict[str, vsw.VswInformation] = dict()
def __init__(self):
# This is a loopback server
self.exchange_module: exchange.ExchangeModule = exchange.ExchangeModule()
@classmethod
def add_vsw_info(cls, vsw_info: vsw.VswInformation):
# Adds elements to the class dictionary
with Swc.lock:
if vsw_info.serial_number == None:
raise SwcException('Needed input parameters in the VSW Information was missing')
key: str = vsw_info.serial_number
Swc.vsw_infos[key] = vsw_info
@classmethod
def get_vsw_entries(cls) -> Dict[str, vsw.VswInformation]:
# Gets the class dictionary
with Swc.lock:
return Swc.vsw_infos
在单元测试中,我实例化Swc对象,然后创建两个vsw。VswInformation对象,并调用Swc类方法将它们添加到类变量中。unittest向环回服务器发送GET请求,环回服务器调用Swc类方法来获取类字典。然而,它是空的。
在调试程序的过程中,我将进入添加该项的两个类方法,并查看类字典是否按预期进行了更新。但是,当我调用类方法在服务器中获取字典时,字典是空的。就好像这条线:
vsw_infos: Dict[str, vsw.VswInformation] = dict()
每次我调用该方法以在服务器中获取字典时都会调用(如果我在unittest中调用该方法,则不是这样(。
因此,我尝试用__init__()
方法初始化字典,但结果相同。我做错了什么?更好的是,我的想法出了什么问题?这个类字典不是像一个全局变量一样通过引用传递吗?我需要全局锁吗(锁似乎有效,但它不是全局锁(。
这是单元测试:
def test_get_all_vsw_infos_from_swc(self):
swc_main: Swc = Swc()
# Create two vsw_info entries
Swc.add_vsw_info(self.create_vsw_info('1234rr'))
Swc.add_vsw_info(self.create_vsw_info('1234ss'))
connection: http.client.HTTPConnection = http.client.HTTPConnection("localhost", 9090)
connection.request("GET", "/watches/")
# The test fails here because of the empty dictionary
response: http.client.HTTPResponse = connection.getresponse()
...
服务器中的GET请求处理程序如下:
def do_GET(self):
print(f"GET request received. Path: {self.path}")
cmd_index: int = self.path.find("watches/")
response_code: int = 200
if cmd_index >= 0:
json_bytes: bytes = None
# This is empty but I expected two entries
dicts = swc.Swc.get_vsw_entries()
...
肯定有一些帖子解决了这个问题,但我找不到一个,尽管SO提供了大量的相似之处。
我发现了问题——我将服务器作为Python进程而不是Python线程运行。我之所以将其作为进程运行,是因为我找不到将serve_forever((作为线程停止的方法。作为进程,可以调用terminate((。
回到Thread,我在while循环中将"serve_forever"替换为handle_request((:
def start_server(self):
print("HTTP Server started")
while self.keep_running:
self.exchange_server.handle_request()
print("Server thread stopped")
这里的问题是handle_request((正在阻塞,所以在我的关闭方法中,我不得不调用一个客户端来发送一个无用的GET请求,这释放了线程
def close_exchange_module(self):
if self.isRunning:
# Release the while loop
self.keep_running = False
# 'unblock' the handle_request()
connection: HTTPConnection = HTTPConnection("localhost", 9090)
connection.request("GET", "/")
connection.close()
self.exchange_thread.join()
self.exchange_server.server_close()
self.is_running = False
print("Server stopped.")