夸脱(异步烧瓶)应用程序中的高速公路网络套接字客户端



大家晚上好。我对这个地方并不陌生,但最终决定注册并寻求帮助。我使用Quart框架(异步Flask)开发了一个Web应用程序。现在,随着应用程序变得越来越大,越来越复杂,我决定将不同的过程分离到不同的服务器实例,这主要是因为我想保持Web服务器干净,更抽象且没有计算负载。
因此,我计划使用一个Web服务器和几个(如果需要)相同的过程服务器。所有服务器都基于夸脱框架,目前只是为了简化开发。我决定使用 Crossbar.io 路由器和高速公路将所有服务器连接在一起。

这里出现了问题。 我关注了这篇文章:

使用 autbahn.asyncio.wamp 非阻塞地运行多个应用程序会话

如何实现具有高速公路异步的交互式 websocket 客户端?

我如何将交叉栏客户端(python3,asyncio)与tkinter集成

如何从协议外部发送高速公路/扭曲的WAMP消息?

似乎我尝试了所有可能的方法在我的夸脱应用程序中实现高速公路 websocket 客户端。我不知道如何使它成为可能,所以这两件事都有效,无论 Quart 应用程序是否有效,但高速公路 WS 客户端不起作用,反之亦然。

简化我的夸脱应用程序如下所示:

from quart import Quart, request, current_app
from config import Config
# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import concurrent.futures
class Component(ApplicationSession):
"""
An application component registering RPC endpoints using decorators.
"""
async def onJoin(self, details):
# register all methods on this object decorated with "@wamp.register"
# as a RPC endpoint
##
results = await self.register(self)
for res in results:
if isinstance(res, wamp.protocol.Registration):
# res is an Registration instance
print("Ok, registered procedure with registration ID {}".format(res.id))
else:
# res is an Failure instance
print("Failed to register procedure: {}".format(res))
@wamp.register(u'com.mathservice.add2')
def add2(self, x, y):
return x + y

def create_app(config_class=Config):
app = Quart(__name__)
app.config.from_object(config_class)
# Blueprint registration
from app.main import bp as main_bp
app.register_blueprint(main_bp)
print ("before autobahn start")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
runner = ApplicationRunner('ws://127.0.0.1:8080 /ws', 'realm1')
future = executor.submit(runner.run(Component))
print ("after autobahn started")
return app

from app import models

在这种情况下,应用程序卡在运行器循环中并且整个应用程序不起作用(无法为请求提供服务),只有当我通过 Ctrl-C 中断运行器(autobahn)循环时,才有可能。

启动后的CMD:

(quart-app) user@car:~/quart-app$ hypercorn --debug --error-log - --access-log - -b 0.0.0.0:8001 tengine:app
Running on 0.0.0.0:8001 over http (CTRL + C to quit)
before autobahn start
Ok, registered procedure with registration ID 4605315769796303

按 Ctrl-C 后:

...
^Cafter autobahn started
2019-03-29T01:06:52 <Server sockets=[<socket.socket fd=11, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]> is serving

如何使夸脱应用程序与高速公路客户端以非阻塞方式一起工作?因此,高速公路打开并保持与Crossbar路由器的websocket连接,并静默地监听后台。

好吧,经过许多个不眠之夜,我终于找到了解决这个难题的好方法。

感谢这篇文章 C-Python asyncio:在线程中运行 discord.py

因此,我像这样重写了我的代码,并能够在内部使用autobahn客户端运行我的Quart应用程序,并且两者都以非阻塞方式积极工作。 整个__init__.py看起来像:

from quart import Quart, request, current_app
from config import Config

def create_app(config_class=Config):
app = Quart(__name__)
app.config.from_object(config_class)
# Blueprint registration
from app.main import bp as main_bp
app.register_blueprint(main_bp)
return app

# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import threading

class Component(ApplicationSession):
"""
An application component registering RPC endpoints using decorators.
"""
async def onJoin(self, details):
# register all methods on this object decorated with "@wamp.register"
# as a RPC endpoint
##
results = await self.register(self)
for res in results:
if isinstance(res, wamp.protocol.Registration):
# res is an Registration instance
print("Ok, registered procedure with registration ID {}".format(res.id))
else:
# res is an Failure instance
print("Failed to register procedure: {}".format(res))

def onDisconnect(self):
print('Autobahn disconnected')
@wamp.register(u'com.mathservice.add2')
def add2(self, x, y):
return x + y

async def start():
runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm1')
await runner.run(Component) # use client.start instead of client.run
def run_it_forever(loop):
loop.run_forever()
asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
loop = asyncio.get_event_loop()
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")

from app import models

在这种情况下,我们使用 autobahn 的 runner.run 创建任务并将其附加到当前循环,然后在新线程中永久运行此循环。

我对当前的解决方案非常满意....但后来发现这个解决方案有一些缺点,这对我来说至关重要,例如:如果连接断开(即交叉路由器变得不可用),请重新连接。使用此方法,如果连接初始化失败或在一段时间后断开,则不会尝试重新连接。此外,对我来说,如何应用程序会话 API 并不明显,即从我的夸脱应用程序中的代码注册/调用 RPC。

幸运的是,我发现了高速公路在其文档中使用的另一个新组件API: https://autobahn.readthedocs.io/en/latest/wamp/programming.html#registering-procedures https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py

它具有自动重新连接功能,并且很容易使用装饰器@component.register('com.something.do')注册RPC的功能,您只需要事先import component即可。

因此,这是__init__.py解决方案的最终视图:

from quart import Quart, request, current_app
from config import Config
def create_app(config_class=Config):
...
return app
from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
import threading

component = Component(
transports=[
{
"type": "websocket",
"url": u"ws://localhost:8080/ws",
"endpoint": {
"type": "tcp",
"host": "localhost",
"port": 8080,
},
"options": {
"open_handshake_timeout": 100,
}
},
],
realm=u"realm1",
)
@component.on_join
def join(session, details):
print("joined {}".format(details))
async def start():
await component.start() #used component.start() instead of run([component]) as it's async function
def run_it_forever(loop):
loop.run_forever()
loop = asyncio.get_event_loop()
#asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
asyncio.get_child_watcher().attach_loop(loop)
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")

from app import models

我希望它能帮助某人。干杯!

最新更新