嗨,我有一个使用Celery Flask SqlAlchemy的设置,我间歇性地收到这个错误:
(psycopg2.DatabaseError) SSL error: decryption failed or bad record mac
我关注了这个帖子:
Celery+SQLAlchemy:数据库错误:(数据库错误)SSL错误:解密失败或记录错误mac
还增加了一些预运行和后运行方法:
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
d.session.remove()
@task_prerun.connect
def on_task_init(*args, **kwargs):
d.engine.dispose()
但我仍然看到这个错误。有人解决了这个问题吗?
请注意,我在AWS上运行这个(两个服务器访问同一个数据库)。数据库本身托管在它自己的服务器(而不是RDS)上。我相信运行的芹菜背景任务总数是6(2+4)。烧瓶前端正在使用gunicorn运行。
我的相关线程:https://github.com/celery/celery/issues/3238#issuecomment-225975220
以下是我的评论以及其他信息:
我在AWS上使用了Celery、SQLAlchemy和PostgreSQL,没有这样的问题。我唯一能想到的区别是我有RDS上的数据库。我想你可以试着切换到RDS临时,只是为了测试这个问题是否仍然存在。如果它与RDS解除连接,那么您需要查看PostgreSQL设置。
根据RDS参数,我启用了SSL:
ssl = 1, Enables SSL connections.
ssl_ca_file = /rdsdbdata/rds-metadata/ca-cert.pem
ssl_cert_file = /rdsdbdata/rds-metadata/server-cert.pem
ssl_ciphers = false, Sets the list of allowed SSL ciphers.
ssl_key_file = /rdsdbdata/rds-metadata/server-key.pem
ssl_renegotiation_limit = 0, integer, (kB) Set the amount of traffic to send and receive before renegotiating the encryption keys.
至于Celery初始化代码,大致是这个
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
import sqldb
engine = sqldb.get_engine()
cached_data = None
def do_the_work():
global engine, ruckus_data
if cached_data is not None:
return cached_data
db_session = None
try:
db_session = scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=engine))
data = sqldb.get_session().query(
sqldb.system.MyModel).filter_by(
my_type = sqldb.system.MyModel.TYPEA).all()
cached_data = {}
for row in data:
... # put row into cached_data
finally:
if db_session is not None:
db_session.remove()
return cached_data
这个do_the_work
函数然后从celence任务中调用。sqldb.get_engine
看起来像这样:
from sqlalchemy import create_engine
_engine = None
def get_engine():
global _engine
if _engine:
return _engine
_engine = create_engine(config.SQL_DB_URL, echo=config.SQL_DB_ECHO)
return _engine
最后,配置模块中的SQL_DB_URI和SQL_DB_ECHO如下:
SQL_DB_URL = 'postgresql+psycopg2://%s:%s@%s/%s' % (
POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DB_NAME)
SQL_DB_ECHO = False