我正在尝试使用Crossbar/autobahn的RPC通过websockets传输大数据。我的设置如下:
- 蟒蛇 2.7
- 交叉路由器(版本 17.8.1.post1(
- 一个后端,它将尝试将大型熊猫数据帧作为 json 字符串发送
- 想要接收此字符串的前端
从本质上讲,我的前端正在尝试调用一个将返回大字符串的函数。
class MyComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("session ready")
try:
res = yield self.call(u'data.get')
我收到此错误:
2017-08-09T16:38:10+0200 session closed with reason wamp.close.transport_lost [WAMP transport was lost without closing the session before]
2017-08-09T16:38:10+0200 Cancelling 1 outstanding requests
2017-08-09T16:38:10+0200 call error: ApplicationError(error=<wamp.close.transport_lost>, args=[u'WAMP transport was lost without closing the session before'], kwargs={}, enc_algo=None)
似乎交叉杆正在把我踢出去,因为我的客户会话在他看来已经死了,但我认为高速公路会分块我的数据,并且调用不会阻塞客户端反应堆。
我在交叉栏配置中启用了一些东西来改善websocket处理;多亏了这一点,我能够传输更多的数据,但最终我会达到一个限制(配置文件主要是从sam和max复制和粘贴的(。
"options": {
"enable_webstatus": false,
"max_frame_size": 16777216,
"auto_fragment_size": 65536,
"fail_by_drop": true,
"open_handshake_timeout": 2500,
"close_handshake_timeout": 1000,
"auto_ping_interval": 10000,
"auto_ping_timeout": 5000,
"auto_ping_size": 4,
"compression": {
"deflate": {
"request_no_context_takeover": false,
"request_max_window_bits": 11,
"no_context_takeover": false,
"max_window_bits": 11,
"memory_level": 4
}
}
}
有什么想法,采取,我做错了什么?
谢谢
客户端代码:
from __future__ import print_function
import pandas as pd
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
class MyComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("session ready")
try:
res = yield self.call(u'data.get')
print('Got the data')
data = pd.read_json(res)
print("call result: {}".format(data.head()))
print("call result shape: {0}, {1}".format(*data.shape))
except Exception as e:
print("call error: {0}".format(e))
if __name__ == "__main__":
from autobahn.twisted.wamp import ApplicationRunner
runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
runner.run(MyComponent)
后端代码
from __future__ import absolute_import, division, print_function
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet import reactor, defer, threads
# Imports
import pandas as pd
def get_data():
"""Returns a DataFrame of stuff as a JSON
:return: str, data as a JSON string
"""
data = pd.DataFrame({
'col1': pd.np.arange(1000000),
'col2': "I'm big",
'col3': 'Like really big',
})
print("call result shape: {0}, {1}".format(*data.shape))
print(data.memory_usage().sum())
print(data.head())
return data.to_json()
class MyBackend(ApplicationSession):
def __init__(self, config):
ApplicationSession.__init__(self, config)
@inlineCallbacks
def onJoin(self, details):
# Register a procedure for remote calling
@inlineCallbacks
def async_daily_price(eqt_list):
res = yield threads.deferToThread(get_data)
defer.returnValue(res)
yield self.register(async_daily_price, u'data.get')
if __name__ == "__main__":
from autobahn.twisted.wamp import ApplicationRunner
runner = ApplicationRunner(url=u"ws://127.0.0.1:8080/ws", realm=u"realm1")
runner.run(MyBackend)
配置
{
"version": 2,
"controller": {},
"workers": [
{
"type": "router",
"realms": [
{
"name": "realm1",
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": true,
"register": true,
"publish": true,
"subscribe": true
},
"disclose": {
"caller": false,
"publisher": false
},
"cache": true
}
]
}
]
}
],
"transports": [
{
"type": "universal",
"endpoint": {
"type": "tcp",
"port": 8080
},
"rawsocket": {
},
"websocket": {
"ws": {
"type": "websocket",
"options": {
"enable_webstatus": false,
"max_frame_size": 16777216,
"auto_fragment_size": 65536,
"fail_by_drop": true,
"open_handshake_timeout": 2500,
"close_handshake_timeout": 1000,
"auto_ping_interval": 10000,
"auto_ping_timeout": 5000,
"auto_ping_size": 4,
"compression": {
"deflate": {
"request_no_context_takeover": false,
"request_max_window_bits": 11,
"no_context_takeover": false,
"max_window_bits": 11,
"memory_level": 4
}
}
}
}
},
"web": {
"paths": {
"/": {
"type": "static",
}
}
}
}
]
}
]
}
crossbar.io 小组建议的解决方案是使用 RPC 的渐进结果选项。
一个完整的工作示例位于 https://github.com/crossbario/autobahn-python/tree/master/examples/twisted/wamp/rpc/progress
在我的代码中,我必须在后端添加结果的分块
step = 10000
if details.progress and len(res) > step:
for i in xrange(0, len(res), step):
details.progress(res[i:i+step])
else:
defer.returnValue(res)
并致来电者
res = yield self.call(
u'data.get'
options=CallOptions(
on_progress=partial(on_progress, res=res_list)
)
)
我的函数on_progress将块添加到结果列表的位置
def on_progress(x, res):
res.append(x)
选择正确的块大小就可以了。