存储事件后,异步事件等待功能不会继续



我正在研究通过Websocket给出响应后应继续的功能。为了实现这一目标,我使用一个异步事件,该事件将在给出响应后设置。

涉及三个功能:

async def send(self, message):
    await self.channel(message.toJSON())
    if (message.method == 'get' or message.method == 'post'):
        event = asyncio.Event()
        self._queueMessage(message, event)
        await event.wait()
    print('continue')
def _queueMessage(self, message, event):
    self.queue.append([message, event])
def _process_response_message(self, message):
    for entry in self.queue:
        if (message['_id'] == entry[0]._id):
            print(entry[1])
            entry[1].set()
            print(entry[1])
            return

返回:

<asyncio.locks.Event object at 0x7f3a1ff20da0 [unset,waiters:1]>
<asyncio.locks.Event object at 0x7f3a1ff20da0 [set,waiters:1]>

在此示例中,print('contine')函数从未被调用,我不这样做,因为在调用之前使用.set()函数,并且.set()似乎可以正常工作。等待event.wait()。

我缺少什么?

基于消息,您获得的_process_response_message似乎在另一个线程中运行。asyncio.Event不是线程安全对象,您应该使用loop.call_soon_threadsafe函数从其他线程调用其方法。尝试这样更改您的代码:

async def send(self, message):
    await self.channel(message.toJSON())
    if (message.method == 'get' or message.method == 'post'):
        loop = asyncio.get_event_loop()
        event = asyncio.Event()
        self._queueMessage(message, loop, event)
        await event.wait()
    print('continue')
def _queueMessage(self, message, loop, event):
    self.queue.append([message, loop, event])
def _process_response_message(self, message):
    for entry in self.queue:
        qmsg, loop, event = entry
        if (message['_id'] == qmsg._id):
            loop.call_soon_threadsafe(event.set)
            return

最新更新