如何使用与redis的烧瓶连接



这是我的main.py文件:

from flask_redis import FlaskRedis
from controllers.utils import redis_conn # get Redis URL
import connexion
BASE_PATH = "/"
def create_app(*specs):
_app = connexion.App(__name__)
for s in specs:
logger.info("Adding specs {}".format(s))
_app.add_api(s, validate_responses=True)
return _app
app = create_app("specs.yaml")
rd_app = app.app
rd_app.config['REDIS_URL'] = redis_conn()
redis_client = FlaskRedis(rd_app)

if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000, debug = True)

Redis似乎有问题并产生了这个错误:

ImportError:无法从部分导入名称"redis_client"初始化模块"main"(很可能是由于循环导入(

我找不到任何使用Redis连接的教程。用法示例get_fruit.py:

from main import redis_client
def get_fruit(colour, shape, taste):
hash_name = rd_collections[colour+'_'+shape]
key_name = '{}:{}'.format(hash_name, taste)
response_redis = redis_client.get(name=key_name)
if response_redis is None:
result = get_fruit_name(colour, shape, taste)
logger.debug("Updating Redis by adding {}".format(location_id))
redis_client.set(name=key_name, value=json.dumps(result['fruit_id']), ex=60*60)
result = OrderedDict({'Result': result})
return result
else:
...

更新:

按照建议尝试:

def create_app(*specs):
"""
Running apps using connexion
"""
_app = connexion.App(__name__)
rd_app = _app.app
rd_app.config['REDIS_URL'] = redis_conn()
rd_client = FlaskRedis(rd_app)
for s in specs:
logger.info("Adding specs {}".format(s))
_app.add_api(s, validate_responses=True)
return _app, rd_client
app, redis_client = create_app("specs.yaml")

但仍然产生相同的错误。

避免循环导入的标准方法如下。

使用您的配置创建一个config.py文件。

from controllers.utils import redis_conn # get Redis URL
class MyConfig(object):
REDIS_URL = redis_conn()

创建一个wsgi.py文件。这将是你的应用程序的起点。

from config import MyConfig
import app_factory
app = app_factory.create_app(MyConfig)
if __name__ == "__main__":
with app.app_context():
app.run()

使用AppFactory模式创建app_factory.py文件:

redis_client = FlaskRedis()
def create_app(config):
app = Flask(__name__)
app.config.from_object(config)
redis_client.init_app(app)
return app

用你的路线创建一个文件routes.py:

from app_factory import redis_client
@bp.route('/')
def index():
return redis_client.get('potato')

相关内容

  • 没有找到相关文章

最新更新