我有一个asyncio。队列生产者和消费者作为2个无限循环运行。生产者定期将作业添加到队列中,消费者等待直到作业可用,然后处理它,然后等待下一个作业。
然而,由于某种原因,我的消费者没有被调用。我认为这是因为生产者的任务永远不会屈服于消费者。
任何关于如何修复它,使两个工人在后台运行描述的想法?
import asyncio
import concurrent.futures
import time
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
time.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
counter = 0
while True:
# Iterates at 2fps
time.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
if __name__ == "__main__":
asyncio.run(main())
如果我使用await asyncio.sleep()
而不是time.sleep()
,代码将为我工作
在async
中,不同时运行,但它应该在看到await
时切换任务-并且似乎需要await asyncio.sleep()
有时间从生产者切换到客户,然后从客户切换回生产者。
你在put()
和get()
中有await
,但我无法解释为什么它不切换任务。也许它切换了,但它切换得太快了,没有足够的时间在队列中发送数据。
import asyncio
import concurrent.futures
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
print('start consumer')
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
await asyncio.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
print('start producer')
counter = 0
while True:
# Iterates at 2fps
await asyncio.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
# wait for end of task
await asyncio.gather(producer_task)
if __name__ == "__main__":
asyncio.run(main())