现有的帖子并没有给我提供一个有用的答案。
我正在尝试使用Pytest运行异步数据库测试(db是带有asyncpg的Postgres),我想使用我的Alembic迁移初始化我的数据库,以便我可以验证它们在此期间是否正常工作。
我的第一次尝试是这样的:
@pytest.fixture(scope="session")
async def tables():
"""Initialize a database before the tests, and then tear it down again"""
alembic_config: config.Config = config.Config('alembic.ini')
command.upgrade(alembic_config, "head")
yield
command.downgrade(alembic_config, "base")
实际上根本没有做任何事情(迁移从未应用于数据库,表也没有创建)。
Alembic的文档&Pytest-Alembic的文档说,应该通过像这样配置您的env
来运行异步迁移:
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
asyncio.run(run_migrations_online())
但这并不能解决问题(然而它确实适用于pytest以外的生产迁移)) .
我偶然发现了一个名为pytest-alembic
的库,它提供了一些内置的测试。
运行pytest --test-alembic
时,我得到以下异常:
将Future附加到不同的循环
关于pytest-asyncio
的GitHub存储库的一些评论建议以下fixture可能会修复它:
@pytest.fixture(scope="session")
def event_loop() -> Generator:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
,但它没有(同样的异常仍然存在)。
接下来,我尝试手动运行upgrade
测试,使用:
async def test_migrations(alembic_runner):
alembic_runner.migrate_up_to("revision_tag_here")
得到
alembic_runner.migrate_up_to("revision_tag_here"
venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264: in run_connection_task返回asyncio.run(运行(引擎))
RuntimeError: asyncio.run()不能从正在运行的事件循环中调用
然而这是一个内部调用通过pytest-alembic
,我自己不调用asyncio.run()
,所以我不能为此应用任何在线修复(try-catch
检查是否有现有的事件循环使用等)。我确信这个和我自己的没有关系asyncio.run()
在alembic env
中定义,因为如果我添加一个断点-或者只是在它上面引发一个异常-这行实际上是从未执行。
最后,我也尝试了nest-asyncio.apply()
,它只是永远挂起。
还有一些博客文章建议使用这个fixture初始化用于测试的数据库表:
async with engine.begin() as connection:
await connection.run_sync(Base.metadata.create_all)
的工作的目的是创建一个数据库来运行测试,但这并没有通过迁移运行,所以这对我的情况没有帮助。
我觉得我已经尝试了所有的方法&我访问了每个文档页面,但到目前为止还没有找到。运行异步迁移测试肯定不会这么困难吧?
如果需要任何额外的信息,我很乐意提供。
我用下面的
很容易地设置并运行了它env.py
—这里的主要思想是迁移可以同步运行
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = mymodel.Base.metadata
def run_migrations_online():
connectable = context.config.attributes.get("connection", None)
if connectable is None:
connectable = AsyncEngine(
engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True
)
)
if isinstance(connectable, AsyncEngine):
asyncio.run(run_async_migrations(connectable))
else:
do_run_migrations(connectable)
async def run_async_migrations(connectable):
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()
然后我添加了一个简单的db初始化脚本init_db.py
from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine
__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"
cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)
async def migrate_db(conn_url: str):
async_engine = create_async_engine(conn_url, echo=True)
async with async_engine.begin() as conn:
await conn.run_sync(__execute_upgrade)
def __execute_upgrade(connection):
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")
那么您的pytest fixture可以看起来像这样conftest.py
...
@pytest_asyncio.fixture(autouse=True)
async def migrate():
await migrate_db(conn_url)
yield
...
注意:我不把我的迁移装置限定在测试会话中,我倾向于在每次测试后放弃和迁移。
现在Alembic提供了一个async模板,参见https://alembic.sqlalchemy.org/en/latest/cookbook.html#using-asyncio-with-alembic。
您可以使用以下命令生成异步env.py
:
alembic init -t async <script_directory_here>
但是继续阅读https://alembic.sqlalchemy.org/en/latest/cookbook.html#programmatic-api-use-connection-sharing-with-asyncio,它建议将run_migrations_online
更改为以下内容:
def run_migrations_online():
"""Run migrations in 'online' mode."""
connectable = config.attributes.get("connection", None)
if connectable is None:
asyncio.run(run_async_migrations())
else:
do_run_migrations(connectable)