asyncio 队列 - 使用常规生成器插入项目 - 令人困惑的行为,意外的值



(使用 Python 3.8.1(

我正在使用常规生成器函数的输出将项目添加到异步队列中(job_gen_generator()下文(。

从队列中提取项目的工作人员收到的值与使用所述生成器的输出添加到队列中的值(我希望(不同。

完整的演示代码:

import asyncio

def job_gen_generator():
# incrementally generates trivial "jobs" data in a dict
job = 1
job_dict = {
'job_number': job,
}
while True:
yield job_dict
job_dict['job_number'] += 1
job_gen = job_gen_generator()

async def worker(instance, q):
# workers get jobs from the queue
while True:
count, job = await q.get()
print(f'Worker {instance} got queue item: {count} with data of...{job}')
await asyncio.sleep(1)
q.task_done()

async def main():
queue = asyncio.Queue()
for count in range(10):
job = next(job_gen)
print(f'Adding queue item: {count} with data of... {job}')
queue.put_nowait((count, job))  # <- Issue here

# The rest of the code is necessary boilerplate 
worker_tasks = []
for instance in range(2):
task = asyncio.create_task(worker(instance, queue))
worker_tasks.append(task)
await queue.join()
for task in worker_tasks:
task.cancel()
await asyncio.gather(*worker_tasks)
asyncio.run(main())

实际输出:

Adding queue item: 0 with data of... {'job_number': 0}
Adding queue item: 1 with data of... {'job_number': 1}
Adding queue item: 2 with data of... {'job_number': 2}
Adding queue item: 3 with data of... {'job_number': 3}
...etc
Worker 0 got queue item: 0 with data of...{'job_number': 9} # all items are 9, the last output from the generator
Worker 1 got queue item: 1 with data of...{'job_number': 9}
Worker 0 got queue item: 2 with data of...{'job_number': 9}
Worker 1 got queue item: 3 with data of...{'job_number': 9}
...etc

预期产出:

Adding queue item: 0 with data of... {'job_number': 0}
Adding queue item: 1 with data of... {'job_number': 1}
Adding queue item: 2 with data of... {'job_number': 2}
Adding queue item: 3 with data of... {'job_number': 3}
...etc
Worker 0 got queue item: 0 with data of...{'job_number': 0}
Worker 1 got queue item: 1 with data of...{'job_number': 1}
Worker 0 got queue item: 2 with data of...{'job_number': 2}
Worker 1 got queue item: 3 with data of...{'job_number': 3}
...etc

问题

我不明白为什么会这样?

  • 我理解 queue.put_nowait(( 是一个同步函数,因此它应该在for语句的下一个循环开始之前完成?
  • 但看起来 for 循环运行直到完成(覆盖捕获生成器输出的变量(,然后将所有项目提交到队列,但我无法解释为什么这是执行流。

笔记: 我可以通过将job包装在dict()中来解决此问题,即queue.put_nowait((count, dict(job)))但我不明白为什么这有帮助,我所做的只是对一个词进行字典。

正如注释所说,您每次都会产生相同的字典对象。

def job_gen_generator():
# incrementally generates trivial "jobs" data in a dict
job = 1
job_dict = {
'job_number': job,
}
while True:
yield job_dict
job_dict['job_number'] += 1

这是一个循环,每次调用next,都会得到相同的对象job_dict。而就您而言,您需要不同的,对吗?

def job_gen_generator():
# incrementally generates trivial "jobs" data in a dict
job = 1
while True:
yield {
'job_number': job,
}
job += 1

最新更新