如何在python中使用线程队列进行处理


from queue import Queue
import threading
import time
queue1 = Queue()

# this function should finish its work and restart with fresh queue1
def association_helper():
# this program should get the whole data from the queue, add into the list and print it. again it starts with
# remaining items in queue (the items which was inserting when this function printings the value)
lock = threading.Lock()
lock.acquire()
items = []
print("Start..")
while True:
if queue1.qsize()>0:
print("Line no 13:", queue1.qsize())
SizeofQueue1 = queue1.qsize()
for i in range(SizeofQueue1):
items.append(queue1.get())
queue1.task_done()
print("Line no 19:", len(items))
print(items)
print("Line no 25: done")
time.sleep(0.1)
lock.release()

i = 0

def main():
global i
# continuous data coming and adding in queue
while True:
queue1.put([i])
i += 1

if __name__ == '__main__':
# main thread will always run (adding the items in the queue)
f_thread = threading.Thread(target=association_helper)
f_thread.daemon = True
f_thread.start()
main()

output:
Start... 
Line no 13: 1415 
Line no 19: 3794 
Line no 25: done
Line no 13: 40591 
Line no 19: 41856 
Line no 25: done 
Line no 13: 78526

as per expectations, the line no 13 and line no 19 should be same. also, after, line no 25, it should print Start..(because association_helps should finish its execution and run again)

为什么association_helper函数运行一次? 为什么它没有完成其工作并使用队列中的新剩余项目重新启动?

赋予动机:

  1. queue1 将始终在主线程中添加新项目。
  2. 当 sizeof(queue1(>0 时,association_helper 应该从队列 1 中提取整个数据并使用数据进行处理。
  3. 但是应该继续在队列中添加项目
  4. association_helper 完成执行后,它应该从队列中的新项目重新开始。

让我们从最后开始:

根据预期,13号线和19号线应该相同。

由于您从一个线程中get队列并在另一个线程上插入(put(而不使用任何Lock因此您不应该期望在两行之间(在线程函数中(不会向队列中添加任何内容。 这就是您所看到的,打印第 13 行的大小并获取第 14 行的大小,结果为不同的值。

另外,在第 25 行之后,它应该打印开始。(因为association_helps应该完成其执行并再次运行(

您在进入while True循环之前print("Start..")。在那里,您将不会再看到该打印,除非您再次调用此函数。


以下是有关如何解决put/get线程队列中的争用的说明和示例:

将锁声明为全局变量。

lock1 = threading.Lock()

现在使用此锁,让我们确保队列的大小和预期的 len(items( 将产生相同的值。

with lock1:
print("Line no 13:", queue1.qsize())
SizeofQueue1 = queue1.qsize()
# and
with lock1:
queue1.put([i])

这将产生 - 相同的预期大小。

Line no 13: 9
Line no 19: 9
[[1], [2], [3], [4], [5], [6], [7], [8], [9]]
Line no 25: done

关于print("Start.."),您可以将其插入 while 循环,以便在迭代之间打印。

while True:
print("Start..")
if queue1.qsize()>0:
# The rest of the code

最后,如果希望items列表仅包含当前小版本的项,则需要清除它。 如果您不清除两次迭代之间的list,差异只会越来越大。

list.clear(( 从列表中删除所有项目。等效于 del a[:]。

您将得到:

while True:
print("Start..")
items.clear()
if queue1.qsize()>0:
# The rest of the code

相关内容

  • 没有找到相关文章