我有一个程序,一旦达到某个阈值,它就会使用线程启动另一个线程。现在,第二个线程正在多次启动。我实现了锁,但我认为我做得不对。
for i in range(max_threads):
t1 = Thread(target=grab_queue)
t1.start()
在grab_queue,我有:
。
rows.append(resultJson)
if len(rows.value()) >= 250:
with Lock():
row_thread = Thread(target=insert_rows, kwargs={'rows': rows.value()})
row_thread.start()
rows.reset()
这将启动另一个线程来处理行列表。我想确保一旦它遇到 if 条件,其他线程就不会运行,以确保不会启动处理行列表的额外线程。
您的锁覆盖了代码的错误部分。在检查行大小和重置行的代码部分之间存在争用条件。鉴于锁仅在大小检查后被获取,两个线程可以很容易地确定数组变得太大,只有这样锁才会启动以序列化数组的重置。在这种情况下,"序列化"意味着任务仍将执行两次,每个线程执行一次,但它将连续发生而不是并行发生。
正确的代码可能如下所示:
rows.append(resultJson)
with grow_lock:
if len(rows.value()) >= 250:
row_thread = Thread(target=insert_rows, kwargs={'rows': rows.value()})
row_thread.start()
rows.reset()
代码还有另一个问题,如问题所示:如果Lock()
引用threading.Lock
,则在每个调用和每个线程中创建并锁定一个新锁!锁保护线程之间共享的资源,要执行该功能,锁本身必须共享。若要解决此问题,请实例化一次锁,然后将其传递给线程的目标函数。
退一步说,您的代码实现了自定义线程池。要做到这一点并涵盖所有极端情况需要大量的工作、测试和调试。有一些专门用于此目的的生产测试模块,例如 Python 附带的 multiprocessing
模块(同时支持进程池和线程池),在重新实现其功能之前熟悉它们是个好主意。例如,有关基于 multiprocessing
的线程池的可访问介绍,请参阅本文。