Python asyncio 双向通信硬件控制



我正在树莓派上构建一些硬件,在那里我有许多传感器和一堆执行器。我想使用 Asyncio 协程来持续监控多个传感器(本质上是这样我就不必从主循环轮询(,然后用一堆执行器做一些事情。

我打算为每个传感器创建一个类,它将具有以下代码中的协程等方法。

我想将传感器方法的结果产生到某个变量,然后我可以对其进行操作。

我的问题是,如果我有多个协程写入一个地方,我该如何安全地做到这一点。异步中的队列似乎是一对一的,而不是多对一的 - 这是正确的吗?

最终我不明白如何让多个协程返回到一个地方,有一些逻辑,然后将消息发送到其他协程

+------------+
|            |                                +------------+
|  Sensor 1  +-------+                        |            |
|            |       |                    +--->  actuator1 |
+------------+       |                    |   |            |
|                    |   +------------+
+------------+       |      +-----------+ |
|            |       |      |           | |
|   Sensor 2 +------------> |  logic    +-+
|            |       |      |           | |
+------------+       |      +-----------+ |
|                    |   +------------+
+------------+       |                    |   |            |
|            |       |                    +--->  actuator2 |
|  Sensor 3  +-------+                        |            |
|            |                                +------------+
+------------+

以上代表了我想要实现的目标。我知道我可以通过轮询和 while 循环来实现这一点,但我喜欢尝试异步/事件驱动方法的想法。

import asyncio
import random
async def sensor(queue):
while True:
# Get some sensor data
sensor_data = "data"
await queue.put(sensor_data)

async def actuator(queue):
while True:
# wait for an item from the producer
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
sensor_coro = sensor(queue)
actuator_coro = actuator(queue)
loop.run_until_complete(asyncio.gather(sensor_coro, actuator_coro))
loop.close()

我的问题是,如果我有多个协程写入一个地方,我该如何安全地做到这一点。异步中的队列似乎是一对一的,而不是多对一的 - 这是正确的吗?

这是不正确的;异步队列是多生产者多使用者。若要实现关系图的逻辑,需要两个同步基元:

  • 一个队列,由sensor协程的多个实例填充,并由logic()协程的单个实例清空

  • 每个执行器都有一个额外的同步装置。哪种设备在这里最好取决于要求。例如,是否允许执行器"丢失"比它们能够响应的速度更快的消息?或者,logic应该等待吗?根据这些,logic()和每个执行器之间的同步将是一个简单的Event(甚至只是一个Future(或另一个队列。

假设每个执行器使用队列,则logic协程可能如下所示:

async def logic(sensor_queue, actuator_queues):
while True:
item = await queue.get()
# process the item and signal some actuators
await actuator_queues[0].put(x1)
await actuator_queues[1].put(x2)

您可以考虑查看古玩库。该文档对于理解异步编程模型非常有帮助,并且阅读起来很有趣。

对于您的特定情况,我会研究任务组。此处解释了如何使用任务组以更高级的方式等待任务返回。

最新更新