从烧瓶查询蜂巢



我是烧瓶的新手,我正在使用以下烧瓶饼干切割机从快速原型开始。项目的主要思想是从蜂巢集群收集数据,并使用烧瓶将其推送给最终用户。

虽然,我能够使用连接器成功地将烧瓶连接到 Hive 服务器pyhive但我遇到了一个奇怪的问题,这与我尝试查询 50 多个项目的select limit有关。

就我而言,我只构建了类似于烧瓶扩展开发的 Hive 类pyhive类似的演示:

from pyhive import hive
from flask import current_app
# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack

class Hive(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
# Use the newstyle teardown_appcontext if it's available,
# otherwise fall back to the request context
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def connect(self):
return hive.connect(current_app.config['HIVE_DATABASE_URI'], database="orc")
def teardown(self, exception):
ctx = stack.top
if hasattr(ctx, 'hive_db'):
ctx.hive_db.close()
return None
@property
def connection(self):
ctx = stack.top
if ctx is not None:
if not hasattr(ctx, 'hive_db'):
ctx.hive_db = self.connect()
return ctx.hive_db

并创建了一个终结点来从 Hive 加载数据:

@blueprint.route('/hive/<limit>')
def connect_to_hive(limit):
cur = hive.connection.cursor()
query = "select * from part_raw where year=2018 LIMIT {0}".format(limit)
cur.execute(query)
res = cur.fetchall()
return jsonify(data=res)

在第一次运行时,如果我尝试加载限制为 50 个项目的东西,一切正常,但是一旦我增加它就会保持没有任何加载的状态。但是,当我使用 jupyter 笔记本加载数据时,它工作正常,这就是为什么我怀疑我可能会从我的烧瓶代码中遗漏某些内容。

问题是库版本问题,通过在我的需求中添加以下内容来解决此问题:

# Hive with needed dependencies
sasl==0.2.1
thrift==0.11.0
thrift-sasl==0.3.0
PyHive==0.6.1

旧版本如下:

sasl>=0.2.1
thrift>=0.10.0
#thrift_sasl>=0.1.0
git+https://github.com/cloudera/thrift_sasl  # Using master branch in order to get Python 3 SASL patches
PyHive==0.6.1

如 pyhive 项目中的开发需求文件中所述。

最新更新