在 Python 中锁定任务的线程



我有一个程序,一旦达到某个阈值,它就会使用线程启动另一个线程。现在,第二个线程正在多次启动。我实现了锁,但我认为我做得不对。

for i in range(max_threads):
    t1 = Thread(target=grab_queue)
    t1.start()

在grab_queue,我有:

   rows.append(resultJson)
    if len(rows.value()) >= 250:
        with Lock():
            row_thread = Thread(target=insert_rows, kwargs={'rows': rows.value()})
            row_thread.start()
            rows.reset()

这将启动另一个线程来处理行列表。我想确保一旦它遇到 if 条件,其他线程就不会运行,以确保不会启动处理行列表的额外线程。

您的锁覆盖了代码的错误部分。在检查行大小和重置行的代码部分之间存在争用条件。鉴于锁仅在大小检查被获取,两个线程可以很容易地确定数组变得太大,只有这样锁才会启动以序列化数组的重置。在这种情况下,"序列化"意味着任务仍将执行两次,每个线程执行一次,但它将连续发生而不是并行发生。

正确的代码可能如下所示:

rows.append(resultJson)
with grow_lock:
    if len(rows.value()) >= 250:
        row_thread = Thread(target=insert_rows, kwargs={'rows': rows.value()})
        row_thread.start()
        rows.reset()

代码还有另一个问题,如问题所示:如果Lock()引用threading.Lock,则在每个调用和每个线程中创建并锁定一个新锁!锁保护线程之间共享的资源,要执行该功能,锁本身必须共享。若要解决此问题,请实例化一次锁,然后将其传递给线程的目标函数。

退一步说,您的代码实现了自定义线程池。要做到这一点并涵盖所有极端情况需要大量的工作、测试和调试。有一些专门用于此目的的生产测试模块,例如 Python 附带的 multiprocessing 模块(同时支持进程池和线程池),在重新实现其功能之前熟悉它们是个好主意。例如,有关基于 multiprocessing 的线程池的可访问介绍,请参阅本文。

相关内容

  • 没有找到相关文章

最新更新