在带有 cocotb 的列表中生成一个协程



我有一个等待设置事件的协程:

@cocotb.coroutine
def wb_RXDR_read(self):
""" waiting util RXDR is read """
if not self._RXDR_read_flag:
while True:
yield self._RXDR_read_event.wait()
break

我想在超时的情况下"屈服"。然后为了做到这一点,我做了这个:

RXDR_timeout = Timer(250, units="us")
ret = yield [RXDR_timeout, self.wb_RXDR_read()]
if ret == RXDR_timeout:
self._dut._log.error("Timeout on waiting RXDR to be read")
raise TestError()

但是我收到此错误:

2ns ERROR    Coroutine i2c_write yielded something the scheduler can't handle
Got type: <type 'list'> repr: [<cocotb.triggers.Timer object at 0x7f2098cb1350>, <cocotb.decorators.RunningCoroutine object at 0x7f2098cb1610>] str: [<cocotb.triggers.Timer object at 0x7f2098cb1350>, <cocotb.decorators.RunningCoroutine object at 0x7f2098cb1610>]
Did you forget to decorate with @cocotb.coroutine?

我的协程用 @cocotb.coroutine 装饰。如果我单独产生它,那行得通:

yield self.wb_RXDR_read() # <- that works

但我不能把它放在一个列表中。是否可以像 unix select(( 一样将协程放在要阻止的列表中?还是保留给触发器类?

好的,我找到了解决方案。事实上,协程不能像时尚本身那样以选择方式触发。它应该首先作为线程启动,并且要检测协程的结束,必须将.join()方法放在 yield 列表中:

RXDR_timeout = Timer(250, units="us")
RXDR_readth = cocotb.fork(self.wb_RXDR_read())
ret = yield [RXDR_timeout, RXDR_readth.join()]
if ret == RXDR_timeout:
self._dut._log.error("Timeout on waiting RXDR to be read")
raise TestError()

要记住的是:

  • 我们可以产生一个协程
  • 为了产生几个协程,我们必须fork()join()

最新更新