如何将C函数实现为可重写的(协程)



环境:C和microython虚拟机中的协作RTOS是任务之一。

为了使VM不阻塞其他RTOS任务,我在vm.c:DISPATCH()中插入RTOS_sleep(),以便在执行每个字节码后,VM将控制权交给下一个RTOS任务。

我创建了一个uPy接口,使用生产者-消费者设计模式从物理数据总线异步获取数据,可以是CAN、SPI、以太网。

uPy中的用法:

can_q = CANbus.queue()
message = can_q.get()

C中的实现使得can_q.get()不阻塞RTOS:它轮询C队列,如果没有收到消息,它调用RTOS_sleep()给另一个任务填充队列的机会。事情是同步的,因为C队列只由另一个RTOS任务更新,而RTOS任务只在调用RTOS_sleep()时切换,即协作

C的实现基本上是:

// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep(); 
return c_queue_get_message();

尽管Python语句can_q.get()没有阻止RTOS,但它确实阻止了uPy脚本。我想重写它,这样我就可以将它与async def一起使用,即协同程序,并且不让它阻塞uPy脚本。

不确定语法,但类似于以下内容:

can_q = CANbus.queue()
message = await can_q.get()

问题

如何编写一个C函数,以便在上面使用await

我更喜欢CPython和microython的答案,但我会接受仅CPython的回答。

注意:这个答案涵盖了CPython和异步框架。然而,这些概念应该适用于其他Python实现以及其他异步框架。

如何编写一个C函数,以便对其执行await

编写一个可以等待结果的C函数的最简单方法是让它返回一个已经不可用的对象,例如asyncio.Future。在返回Future之前,代码必须安排将来的结果由某种异步机制设置。所有这些基于协程的方法都假设您的程序在某个事件循环下运行,该事件循环知道如何安排协程。

但是,返回一个未来并不总是足够的——也许我们想定义一个具有任意数量的悬挂点的对象。返回future只挂起一次(如果返回的future不完整),在future完成后恢复,仅此而已。一个相当于包含多个awaitasync def的不可用对象不能通过返回future来实现,它必须实现协程通常实现的协议。这有点像实现自定义__next__的迭代器,可以用来代替生成器。

定义自定义可用

要定义我们自己的awaitable类型,我们可以求助于PEP492,它确切地指定了哪些对象可以传递给await。除了用async def定义的Python函数外,用户定义的类型还可以通过定义__await__特殊方法使对象不可用,Python/C将该方法映射到PyTypeObject结构的tp_as_async.am_await部分。

这意味着在Python/C中,您必须执行以下操作:

  • 为扩展类型的tp_as_async字段指定一个非NULL值
  • 使其am_await成员指向一个C函数,该函数接受您类型的实例并返回实现迭代器协议的另一个扩展类型的实例,即定义tp_iter(通常定义为PyIter_Self)和tp_iternext
  • 迭代器的CCD_ 25必须推进协程的状态机。来自tp_iternext的每个非异常返回都对应于一个挂起,而最终的StopIteration异常表示来自协同程序的最终返回。返回值存储在StopIterationvalue属性中

为了使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在挂起后何时恢复。asyncio定义的大多数协程都希望在asyncio事件循环下运行,并在内部使用asyncio.get_event_loop()(和/或接受显式loop参数)来获得其服务。

协同程序示例

为了说明Python/C代码需要实现什么,让我们考虑用Pythonasync def表示的简单协程,例如asyncio.sleep():

async def my_sleep(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
await future
# we get back here after the timeout has elapsed, and
# immediately return

my_sleep创建一个Future,安排它在n秒内完成(其结果变为已设置),并挂起它自己,直到将来完成。最后一部分使用await,其中await x的意思是"允许x决定我们现在是暂停还是继续执行"。一个不完整的未来总是决定挂起,异步Task协同程序驱动程序的特殊情况产生了无限期挂起它们的未来,并将它们的完成与恢复任务联系起来。其他事件循环(curio等)的暂停机制可能在细节上有所不同,但其基本思想是相同的:await是可选的执行暂停。

返回生成器的__await__()

要将其转换为C,我们必须去掉神奇的async def函数定义,以及await挂接点。删除async def相当简单:等效的普通函数只需要返回一个实现__await__:的对象

def my_sleep(n):
return _MySleep(n)
class _MySleep:
def __init__(self, n):
self.n = n
def __await__(self):
return _MySleepIter(self.n)

my_sleep()返回的_MySleep对象的__await__方法将由await运算符自动调用,以将不可用的对象(传递给await的任何对象)转换为迭代器。这个迭代器将用于询问等待的对象是选择挂起还是提供值。这与for o in x语句调用x.__iter__()可迭代的x转换为具体的迭代器的方式非常相似。

当返回的迭代器选择挂起时,它只需要生成一个值。值的含义(如果有的话)将由协程驱动程序来解释,协程驱动通常是事件循环的一部分。当迭代器选择停止执行并从await返回时,它需要停止迭代。使用生成器作为方便的迭代器实现,_MySleepIter看起来像这样:

def _MySleepIter(n):
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_later(n, future.set_result, None)
# yield from future.__await__()
for x in future.__await__():
yield x

由于await x映射到yield from x.__await__(),我们的生成器必须耗尽future.__await__()返回的迭代器。如果future不完整,Future.__await__返回的迭代器将产生结果,否则返回future的结果(此处我们忽略,但yield from实际上提供了结果)。

返回自定义迭代器的__await__()

在C中实现my_sleep的最后一个障碍是使用_MySleepIter的生成器。幸运的是,任何生成器都可以被转换为一个有状态迭代器,其__next__执行该段代码直到下一个等待或返回。__next__实现生成器代码的状态机版本,其中yield通过返回值来表示,return通过提升StopIteration来表示。例如:

class _MySleepIter:
def __init__(self, n):
self.n = n
self.state = 0
def __iter__(self):  # an iterator has to define __iter__
return self
def __next__(self):
if self.state == 0:
loop = asyncio.get_event_loop()
self.future = loop.create_future()
loop.call_later(self.n, self.future.set_result, None)
self.state = 1
if self.state == 1:
if not self.future.done():
return next(iter(self.future))
self.state = 2
if self.state == 2:
raise StopIteration
raise AssertionError("invalid state")

翻译成C

以上是一些类型,但它是有效的,并且只使用可以用本机Python/C函数定义的构造。

实际上,将这两个类翻译成C非常简单,但超出了这个答案的范围。

最新更新