在处理资源时暂停所有线程



process_data()内部的if语句中,当条件满足时,我如何暂停所有其他线程,等待insert_list上的工作完成,然后再次恢复它们?

import threading
import logging
import queue

logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s')

def process_data(data, insert_list):
while True:
item = data.get()
logging.debug(f'Working on: {item}.')
if item is None:
break
insert_list.append(item)
if len(insert_list) == 2:
logging.debug(f'Committing: {insert_list}')
insert_list = []
data.task_done()

num_workers = 4
insert_list = []
data = queue.Queue()
threads = []
for i in range(num_workers):
t = threading.Thread(target=process_data, args=(data, insert_list))
t.start()
threads.append(t)
for n in range(1,11):
data.put(n)
data.join()
for i in range(num_workers):
data.put(None)
for t in threads:
t.join()

我想看到的是这样的东西:

[DEBUG] (Thread-1  ) Working on: 1.
[DEBUG] (Thread-2  ) Working on: 2.
[DEBUG] (Thread-2  ) Committing: [1, 2]
[DEBUG] (Thread-1  ) Working on: 6.
[DEBUG] (Thread-4  ) Working on: 4.
[DEBUG] (Thread-4  ) Committing: [6, 4]
etc...

对于上下文,我试图使用线程来加快将文本文件中的大量数据加载到数据库的速度。我使用线程来提取数据,并将其添加到所有线程共享的列表中。当列表达到一定大小时,我会获取列表并将其项目提交到数据库(不支持并发插入(,而我希望其他线程暂停向列表添加新项目,等待列表处理,然后继续。我不担心数据的处理顺序,只担心线程停止向列表中添加新项目,直到列表再次准备好。

谢谢。

也许为insert_list定义了最大大小的标准队列会是一个更好的决定,但不确定:

insert_list = Queue(max_size=2)
def process_data(data, insert_list):
while True:
item = data.get()
logging.debug(f'Working on: {item}.')
if item is None:
break
insert_list.put(item)
if insert_list.full():
i_list = [insert_list.get() for _ in range(insert_list.qsize())]
logging.debug(f'Committing: {i_list}')
data.task_done()

最新更新