Localstack运行aws lambda (python)与sqlalchemy过滤器查询-导致异常



我创建了一个AWS lambda函数,它使用sqlalchemy连接到mysql数据库。

代码运行成功无论是在我的开发站上(从main调用lambda函数),还是从AWS本身作为lambda。

我正在使用localstack docker构建一个测试。我在localstack上创建了lambda,然后调用了它。sqlalchemy查询数据库失败只有一个例外。如果我将sqlalchemy查询修改为一个简单的查询,它可以正常工作。

注意:

失败的函数是get_rows_by_status_and_age,这是lambda代码:


db_connection_str=f'mysql+pymysql://{db_user}:{db_pass}@{db_host}/{db_schema}'
def db_init():
engine = create_engine(db_connection_str, echo=False, isolation_level="READ COMMITTED")
# logger.info(db_connection_str)
# logger.info(engine.dialect)
sm = sessionmaker(bind=engine, expire_on_commit=False)
session = sm()
return engine, session
def get_rows_by_status_and_age(session, status_list, min_age_minutes):
time_current = datetime.datetime.now(datetime.timezone.utc)
time_old = time_current - datetime.timedelta(minutes=min_age_minutes)
try:
q = session.query(func.count(Execution.id)).filter(
Execution.status.in_(status_list),
Execution.updated_at < time_old)
return q.scalar()
except Exception as e:
logger.error("failed with error: {}".format(e))
raise
def lambda_handler(event, context):
engine, session = db_init()
try:
count = get_rows_by_status_and_age(session, [1,6,7], 20)
logger.info(count)
except Exception as e:
logger.info('Error in the lambda:')
logger.exception(e)

if __name__ == '__main__':  # pragma: no cover
logging.basicConfig()
lambda_handler("", "")

上述查询的例外是KeyError:

localstack_container            | 2021-08-24T08:00:48:ERROR:root: failed with error: <function comma_op at 0x7fd9677e0440>
localstack_container            | 2021-08-24T08:00:48:INFO:root: Error in the lambda:
localstack_container            | 2021-08-24T08:00:48:ERROR:root: <function comma_op at 0x7fd9677e0440>
localstack_container            | Traceback (most recent call last):
localstack_container            |   File "/tmp/localstack/lambda_script_l_656d31ba.py", line 43, in lambda_handler
localstack_container            |     count = get_rows_by_status_and_age(session, [1,6,7], 20)
localstack_container            |   File "/tmp/localstack/lambda_script_l_656d31ba.py", line 29, in get_rows_by_status_and_age
localstack_container            |     return q.scalar()
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3312, in scalar
localstack_container            |     ret = self.one()
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3282, in one
localstack_container            |     ret = self.one_or_none()
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3251, in one_or_none
localstack_container            |     ret = list(self)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3324, in __iter__
localstack_container            |     return self._execute_and_instances(context)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/orm/query.py", line 3349, in _execute_and_instances
localstack_container            |     result = conn.execute(querycontext.statement, self._params)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/engine/base.py", line 988, in execute
localstack_container            |     return meth(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
localstack_container            |     return connection._execute_clauseelement(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/engine/base.py", line 1098, in _execute_clauseelement
localstack_container            |     else None,
localstack_container            |   File "<string>", line 1, in <lambda>
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/elements.py", line 462, in compile
localstack_container            |     return self._compiler(dialect, bind=bind, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/elements.py", line 468, in _compiler
localstack_container            |     return dialect.statement_compiler(dialect, self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 562, in __init__
localstack_container            |     Compiled.__init__(self, dialect, statement, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 319, in __init__
localstack_container            |     self.string = self.process(self.statement, **compile_kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 350, in process
localstack_container            |     return obj._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 2092, in visit_select
localstack_container            |     for name, column in select._columns_plus_names
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 2092, in <listcomp>
localstack_container            |     for name, column in select._columns_plus_names
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 1845, in _label_select_column
localstack_container            |     return result_expr._compiler_dispatch(self, **column_clause_args)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 820, in visit_label
localstack_container            |     + self.preparer.format_label(label, labelname)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 826, in visit_label
localstack_container            |     self, within_columns_clause=False, **kw
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 1105, in visit_function
localstack_container            |     ) % {"expr": self.function_argspec(func, **kwargs)}
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 1117, in function_argspec
localstack_container            |     return func.clause_expr._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 718, in visit_grouping
localstack_container            |     return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.b3651fa4/sqlalchemy/sql/compiler.py", line 968, in visit_clauselist
localstack_container            |     sep = OPERATORS[clauselist.operator]
localstack_container            | KeyError: <function comma_op at 0x7fd9677e0440>

如果我将查询修改为以下查询,它也会失败,但会出现不同的异常(我删除了function .count()):

q = session.query(Execution.id).filter(
Execution.status.in_(status_list),
Execution.updated_at < time_old)
return q.all()

最后一个例外是UnsupportedCompilationError,如下所示:

localstack_container            | 2021-08-24T09:44:20:ERROR:root: failed with error: Compiler <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7fb84801d9d0> can't render element of type <function in_op at 0x7fb8464b0b90> (Background on this error at: http://sqlalche.me/e/l7de)
localstack_container            | 2021-08-24T09:44:20:INFO:root: Error in the lambda:
localstack_container            | 2021-08-24T09:44:20:ERROR:root: Compiler <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7fb84801d9d0> can't render element of type <function in_op at 0x7fb8464b0b90> (Background on this error at: http://sqlalche.me/e/l7de)
localstack_container            | Traceback (most recent call last):
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 1281, in visit_binary
localstack_container            |     opstring = OPERATORS[operator_]
localstack_container            | KeyError: <function in_op at 0x7fb8464b0b90>
localstack_container            | 
localstack_container            | During handling of the above exception, another exception occurred:
localstack_container            | 
localstack_container            | Traceback (most recent call last):
localstack_container            |   File "/tmp/localstack/lambda_script_l_a11f462d.py", line 46, in lambda_handler
localstack_container            |     count = get_rows_by_status_and_age(session, [1,6,7], 20)
localstack_container            |   File "/tmp/localstack/lambda_script_l_a11f462d.py", line 32, in get_rows_by_status_and_age
localstack_container            |     return q.all()
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/orm/query.py", line 3168, in all
localstack_container            |     return list(self)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/orm/query.py", line 3324, in __iter__
localstack_container            |     return self._execute_and_instances(context)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/orm/query.py", line 3349, in _execute_and_instances
localstack_container            |     result = conn.execute(querycontext.statement, self._params)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/engine/base.py", line 988, in execute
localstack_container            |     return meth(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
localstack_container            |     return connection._execute_clauseelement(self, multiparams, params)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/engine/base.py", line 1098, in _execute_clauseelement
localstack_container            |     else None,
localstack_container            |   File "<string>", line 1, in <lambda>
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/elements.py", line 462, in compile
localstack_container            |     return self._compiler(dialect, bind=bind, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/elements.py", line 468, in _compiler
localstack_container            |     return dialect.statement_compiler(dialect, self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 562, in __init__
localstack_container            |     Compiled.__init__(self, dialect, statement, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 319, in __init__
localstack_container            |     self.string = self.process(self.statement, **compile_kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 350, in process
localstack_container            |     return obj._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 2117, in visit_select
localstack_container            |     text, select, inner_columns, froms, byfrom, kwargs
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 2216, in _compose_select_body
localstack_container            |     t = select._whereclause._compiler_dispatch(self, **kwargs)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 972, in visit_clauselist
localstack_container            |     c._compiler_dispatch(self, **kw) for c in clauselist.clauses
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 970, in <genexpr>
localstack_container            |     s
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 972, in <genexpr>
localstack_container            |     c._compiler_dispatch(self, **kw) for c in clauselist.clauses
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/visitors.py", line 91, in _compiler_dispatch
localstack_container            |     return meth(self, **kw)
localstack_container            |   File "/tmp/localstack/zipfile.183ee235/sqlalchemy/sql/compiler.py", line 1283, in visit_binary
localstack_container            |     raise exc.UnsupportedCompilationError(self, operator_)
localstack_container            | sqlalchemy.exc.UnsupportedCompilationError: Compiler <sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7fb84801d9d0> can't render element of type <function in_op at 0x7fb8464b0b90> (Background on this error at: http://sqlalche.me/e/l7de)

如果我只留下一个过滤器,它可以工作:

q = session.query(Execution.id).filter(
Execution.updated_at < time_old)
return q.all()

从试图理解这个问题,它似乎是一个问题与编译查询可能由于数据库依赖的语法,但我打印出sqlalchemy方言,它是mysql,就像它应该是。就像我提到的,它在除了localstack lambda之外的所有其他环境中都能很好地工作。

这似乎是由localstack环境引起的。

任何想法?谢谢。

这是不是一个答案,但一个可能的解决方案.

Localstack有一个在单独的docker中执行lambda函数的选项。使用此配置,sqlalchemy错误不会发生,并且lambda运行成功。

为了让localstack在单独的docker中调用lambda函数,在启动localstack docker时使用LAMBDA_EXECUTOR环境变量。

在docker-compose中,它看起来像这样:

services:
...
localstack:
image: localstack/localstack
environment:
...
LAMBDA_EXECUTOR: docker
LAMBDA_DOCKER_NETWORK: testing_network
...
volumes:
- /var/run/docker.sock:/var/run/docker.sock
...
container_name: localstackcontainer
...

解释:

  • LAMBDA_EXECUTOR -告诉localstack在单独的docker中运行LAMBDA_EXECUTOR

  • LAMBDA_DOCKER_NETWORK -告诉localstack将单独的docker添加到特定的docker网络。(您通常会将其添加到运行所有服务的docker网络中,以允许通过docker网络进行通信)。("testing_network"在我的例子中)

  • 卷映射/var/run/docker.sock:/var/run/docker. socksock -允许localstack docker在你的主机上启动一个新的docker。

  • 注意:Lambda代码现在将连接到localstack中的其他aws服务,使用localstack容器名称作为endpoint_url(通过docker网络连接)。在我的例子中,它是"localstackcontainer:4566"(当lambda在localstack docker内部运行时,而不是localhost:4566)。注意,这需要我将localstack容器名称从localstack_container更改为localstackcontainer,因为aws endpoint不能包含下划线。