Python中线程之间的通信(不使用全局变量)



假设我们有一个主线程,它为测试模块" test_a"one_answers" test_b"启动两个线程。无论是完成测试还是遇到错误、警告或想要更新其他信息,两个测试模块线程都保持自己的状态。

主线程如何访问这些信息并采取相应的行动。例如,如果" test_a"引发错误标志;"main"如何在出现错误之前知道并停止其他测试?

一种方法是使用全局变量,但这会变得非常难看。很快。

显而易见的解决方案是共享某种可变变量,通过在constructor/start处将其传递给线程对象/函数。

要做到这一点,干净的方法是构建一个具有适当实例属性的类。如果您正在使用threading.Thread子类,而不仅仅是一个线程函数,您通常可以使用子类本身作为粘贴这些属性的地方。但我会用list来表示因为它更短:
def test_a_func(thread_state):
    # ...
    thread_state[0] = my_error_state
    # ...
def main_thread():
    test_states = [None]
    test_a = threading.Thread(target=test_a_func, args=(test_states,))
    test_a.start()

您可以(通常也想)将LockCondition打包到可变状态对象中,这样您就可以在main_threadtest_a之间正确地同步。

(另一种选择是使用queue.Queue, os.pipe等来传递信息,但您仍然需要将该队列或管道获取到子线程-您使用与上述完全相同的方式。)


然而,你是否真的需要这样做是值得考虑的。如果您将test_atest_b视为"作业",而不是"线程函数",则可以在池中执行这些作业,并让池处理传递结果或错误。

例如:

try:
    with concurrent.futures.ThreadPoolExecutor(workers=2) as executor:
        tests = [executor.submit(job) for job in (test_a, test_b)]
        for test in concurrent.futures.as_completed(tests):
            result = test.result()
except Exception as e:
    # do stuff

现在,如果test_a函数引发异常,主线程将获得该异常,并且,因为这意味着退出with块,并且所有其他作业被取消并丢弃,并且工作线程关闭。

如果你使用的是2.5-3.1,你没有内置concurrent.futures,但你可以安装PyPI的后端口,或者你可以重写multiprocessing.dummy.Pool周围的东西。(这种方式稍微更复杂,因为您必须创建一系列作业并调用map_async以返回AsyncResult对象的迭代器……但实际上这仍然非常简单。)

最新更新