使用asyncio使websocket回调异步



我正在尝试使用Python 3.5.2的asyncio和websockets实现一个基本的websocket客户端。

基本上,我希望connect_to_dealer是一个阻塞调用,但等待websocket消息在不同的线程上。

在阅读了一些文档(我对Python的经验很少)之后,我得出结论,asyncio.ensure_future()传递一个协程(listen_for_message)是可行的方法。

现在,我在不同的线程上运行listen_for_message,但是从协程中,我似乎不能使用await或任何其他机制使调用同步。如果我这样做,即使是一个简单的sleep,执行也会永远等待(它挂起)。

我想知道我做错了什么。

async def listen_for_message(self, future, websocket):
    while (True):
        try:
            await asyncio.sleep(1) # It hangs here
            print('Listening for a message...')
            message = await websocket.recv() # If I remove the sleep, hangs here
            print("< {}".format(message))
            future.set_result(message)
            future.done()
        except websockets.ConnectionClosed as cc:
            print('Connection closed')
        except Exception as e:
            print('Something happened')
def handle_connect_message(self, future):
    # We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
    print(future.result)
async def connect_to_dealer(self):
    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket'))
    hello_message = await websocket.recv()
    print("< {}".format(hello_message))
    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
    if sub_response.status_code == 200:
        print('Now we're observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))
    # Now we need to handle messages but continue with the regular execution
    try:
        future = asyncio.get_event_loop().create_future()
        future.add_done_callback(self.handle_connect_message)
        asyncio.ensure_future(self.listen_for_message(future, websocket))
    except Exception as e:
        print(e)

您需要使用显式期货的具体原因是什么?

对于asyncio,您可以使用coroutinesTasks的组合来实现大多数目的。任务本质上是包装的协同程序,在后台运行,独立于其他异步代码,所以你不必显式地管理它们的流程或与其他代码位混淆。

我不完全确定您的最终目标,但也许下面阐述的方法可以为您提供一些工作:

import asyncio
async def listen_for_message(websocket):
    while True:
        await asyncio.sleep(0)
        try:
            print('Listening for a message...')
            message = await websocket.recv()
            print("< {}".format(message))
        except websockets.ConnectionClosed as cc:
            print('Connection closed')
        except Exception as e:
            print('Something happened')

async def connect_to_dealer():
    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket')
    hello_message = await websocket.recv()
    print("< {}".format(hello_message))
    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
        user_id='username', connection_id=connection_id), headers=headers)
    if sub_response.status_code == 200:
        print('Now we're observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))

async def my_app():
    # this will block until connect_to_dealer() returns
    websocket = await connect_to_dealer()
    # start listen_for_message() in its own task wrapper, so doing it continues in the background
    asyncio.ensure_future(listen_for_message(websocket))
    # you can continue with other code here that can now coexist with listen_for_message()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(my_app())
    loop.run_forever() 

相关内容

  • 没有找到相关文章

最新更新