sqlalchemy.exc.OperationalError: (psycopg2.OperationalError)



我有一个基于Flask的后端应用程序,当在本地运行代码时,这个错误不会发生,但是当在kubernetes pod中部署在我的服务器中时,它确实会发生。

有趣的是,当您第一次运行代码时,它立即失败(当运行Future时),但当您再次运行它时,错误仍然发生,但代码继续(并成功完成)。很奇怪。我得到的错误是:
2021-07-30 14:59:23,537 - mit_backend.logic.optimize - INFO - Inside first pass run code
2021-07-30 14:59:23,587 - mit_backend.logic.optimize - INFO - Optimising region ML10F   
2021-07-30 14:59:23,588 - mit_backend.logic.optimize - INFO - Optimising region ML47F   
2021-07-30 14:59:23,589 - mit_backend.logic.optimize - INFO - Optimising region ML40F   
--- Logging error ---
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
psycopg2.OperationalError: SSL error: wrong version number

The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 175, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/code/mit_backend/logic/optimize.py", line 241, in optimize_first_pass_run
config = get_csv_config(user_identity)
File "/code/mit_backend/modules/v1/__init__.py", line 623, in get_csv_config
res = CSVConfig.query.filter(CSVConfig.user_id == user_identity).first()
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3429, in first
ret = list(self[0:1])
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__
return list(res)
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: wrong version number
[SQL: SELECT csv_config.user_id AS csv_config_user_id, csv_config."csvHeaders" AS "csv_config_csvHeaders", csv_config."deliveryName" AS "csv_config_deliveryName", csv_config.id AS csv_config_id, csv_config.line1 AS csv_config_line1, csv_config.line2 AS csv_config_line2, csv_config.quantity AS csv_config_quantity, csv_config."routeNumber" AS "csv_config_routeNumber", csv_config.suburb AS csv_config_suburb, csv_config.weight AS csv_config_weight 
FROM csv_config 
WHERE csv_config.user_id = %(user_id_1)s 
LIMIT %(param_1)s]
[parameters: {'user_id_1': 'sftp', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/code/mit_backend/logic/optimize.py", line 300, in first_pass_run
future_df_result = future.result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: wrong version number
[SQL: SELECT csv_config.user_id AS csv_config_user_id, csv_config."csvHeaders" AS "csv_config_csvHeaders", csv_config."deliveryName" AS "csv_config_deliveryName", csv_config.id AS csv_config_id, csv_config.line1 AS csv_config_line1, csv_config.line2 AS csv_config_line2, csv_config.quantity AS csv_config_quantity, csv_config."routeNumber" AS "csv_config_routeNumber", csv_config.suburb AS csv_config_suburb, csv_config.weight AS csv_config_weight 
FROM csv_config 
WHERE csv_config.user_id = %(user_id_1)s 
LIMIT %(param_1)s]
[parameters: {'user_id_1': 'sftp', 'param_1': 1}]
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib64/python3.6/logging/__init__.py", line 994, in emit
msg = self.format(record)
File "/usr/lib64/python3.6/logging/__init__.py", line 840, in format
return fmt.format(record)
File "/usr/lib64/python3.6/logging/__init__.py", line 577, in format
record.message = record.getMessage()
File "/usr/lib64/python3.6/logging/__init__.py", line 338, in getMessage
msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
File "/usr/lib64/python3.6/threading.py", line 884, in _bootstrap
self._bootstrap_inner()
File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib64/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.6/socketserver.py", line 654, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
self.handle()
File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 345, in handle
BaseHTTPRequestHandler.handle(self)
File "/usr/lib64/python3.6/http/server.py", line 418, in handle
self.handle_one_request()
File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 379, in handle_one_request
return self.run_wsgi()
File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 323, in run_wsgi
execute(self.server.app)
File "/usr/local/lib/python3.6/site-packages/werkzeug/serving.py", line 312, in execute
application_iter = app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2464, in __call__
return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.6/site-packages/flask_restx/api.py", line 375, in wrapper
resp = resource(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask/views.py", line 89, in view
return self.dispatch_request(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask_restx/resource.py", line 44, in dispatch_request
resp = meth(*args, **kwargs)
File "/code/mit_backend/modules/v1/controllers/manifest.py", line 161, in post
files, df, sftp_date_latest = do_optimize(addresses, user_identity)
File "/code/mit_backend/logic/optimize.py", line 38, in do_optimize
df = first_pass_run(df, user_identity)
File "/code/mit_backend/logic/optimize.py", line 307, in first_pass_run
LOG.error("Exception is ", e.__cause__)
Message: 'Exception is '
Arguments: (_RemoteTraceback('n"""nTraceback (most recent call last):n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_contextn    cursor, statement, parameters, contextn  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_executen    cursor.execute(statement, parameters)npsycopg2.OperationalError: SSL error: decryption failed or bad record macnnnThe above exception was the direct cause of the following exception:nnTraceback (most recent call last):n  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 175, in _process_workern    r = call_item.fn(*call_item.args, **call_item.kwargs)n  File "/code/mit_backend/logic/optimize.py", line 241, in optimize_first_pass_runn    config = get_csv_config(user_identity)n  File "/code/mit_backend/modules/v1/__init__.py", line 623, in get_csv_confign    res = CSVConfig.query.filter(CSVConfig.user_id == user_identity).first()n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3429, in firstn    ret = list(self[0:1])n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__n    return list(res)n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__n    return self._execute_and_instances(context)n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instancesn    result = conn.execute(querycontext.statement, self._params)n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in executen    return meth(self, multiparams, params)n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connectionn    return connection._execute_clauseelement(self, multiparams, params)n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelementn    distilled_params,n  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_contextn    e, statement, parameters, cursor, contextn  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exceptionn    sqlalchemy_exception, with_traceback=exc_info[2], from_=en  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_n    raise exceptionn  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_contextn    cursor, statement, parameters, contextn  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_executen    cursor.execute(statement, parameters)nsqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record macnn[SQL: SELECT csv_config.user_id AS csv_config_user_id, csv_config."csvHeaders" AS "csv_config_csvHeaders", csv_config."deliveryName" AS "csv_config_deliveryName", csv_config.id AS csv_config_id, csv_config.line1 AS csv_config_line1, csv_config.line2 AS csv_config_line2, csv_config.quantity AS csv_config_quantity, csv_config."routeNumber" AS "csv_config_routeNumber", csv_config.suburb AS csv_config_suburb, csv_config.weight AS csv_config_weight nFROM csv_config nWHERE csv_config.user_id = %(user_id_1)s n LIMIT %(param_1)s]n[parameters: {'user_id_1': 'sftp', 'param_1': 1}]n(Background on this error at: http://sqlalche.me/e/13/e3q8)n"""',),)

错误发生在期货ProcessPoolExecutor:

def first_pass_run(df, user_identity):
LOG.info("Inside first pass run code")
first_json = True
df_final = None
df_grouped = df.groupby('Route Number')
auth_claim = get_authorization_claims_from_header()
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
_futures = []
for region_name, _df in df_grouped:
_futures.append(
executor.submit(
optimize_first_pass_run, region_name, _df, user_identity, auth_claim
))
for future in concurrent.futures.as_completed(_futures):
try:
future_df_result = future.result()
if first_json:
df_final = future_df_result
first_json = False
else:
df_final = df_final.append(future_df_result, ignore_index=True)
except Exception as e:
LOG.error("Exception is ", e.__cause__)

我以悲观的方式启动我的SQLAlchemy引擎:

pg_db = SQLAlchemy(app)
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'], pool_pre_ping=True)

代码没有在WSGI或gunicorn上运行,它就像这样简单:

if __name__ == '__main__':
LOG.info('running environment: %s', os.environ.get('ENV', 'production'))
app.config['DEBUG'] = os.environ.get('ENV') == 'development'
app.run(host='0.0.0.0', port=5001, debug=False, threaded=True)

知道为什么会发生这种情况吗?谢谢你。

我还没能找到一个适当的修复,但我实现了以下事情,虽然异常仍然被抛出,代码重试并最终成功:

删除连接池

engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'], poolclass=NullPool)

事件侦听器

@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()

@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)

但是,根据文档,上面的两个定义涉及到处理连接的乐观方式和使用Pools

初始化Process Pool前重新连接

engine.dispose()
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
_futures = []
for region_name, _df in df_grouped:
_futures.append(
executor.submit(
optimize_first_pass_run, region_name, _df, user_identity, auth_claim, True
))
for future in concurrent.futures.as_completed(_futures):
future_df_result = future.result()

当尝试在数据库中插入对象时添加重试

def add_new_record(row):
attempts = 0
while attempts <= 5:
attempts = attempts + 1
try:
pg_db.session.add(row)
pg_db.session.commit()
return True
except (SQLAlchemyError, psycopg2.OperationalError, sqlalchemy.exc.OperationalError) as e:
if attempts < 5:
LOG.warn("Attempt failed. Trying rollback")
pg_db.session.rollback()
LOG.warn(e.__cause__)
LOG.warn(e.__str__())
time.sleep(5)
else:
LOG.error("Maximum number of retries reached. Raising an error")
raise e
return False

最新更新