Pytest Alembic初始化数据库异步迁移



现有的帖子并没有给我提供一个有用的答案。

我正在尝试使用Pytest运行异步数据库测试(db是带有asyncpg的Postgres),我想使用我的Alembic迁移初始化我的数据库,以便我可以验证它们在此期间是否正常工作。

我的第一次尝试是这样的:

@pytest.fixture(scope="session")
async def tables():
"""Initialize a database before the tests, and then tear it down again"""
alembic_config: config.Config = config.Config('alembic.ini')
command.upgrade(alembic_config, "head")
yield
command.downgrade(alembic_config, "base")

实际上根本没有做任何事情(迁移从未应用于数据库,表也没有创建)。

Alembic的文档&Pytest-Alembic的文档说,应该通过像这样配置您的env来运行异步迁移:

async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
asyncio.run(run_migrations_online())

但这并不能解决问题(然而它确实适用于pytest以外的生产迁移)) .

我偶然发现了一个名为pytest-alembic的库,它提供了一些内置的测试。

运行pytest --test-alembic时,我得到以下异常:

将Future附加到不同的循环

关于pytest-asyncio的GitHub存储库的一些评论建议以下fixture可能会修复它:

@pytest.fixture(scope="session")
def event_loop() -> Generator:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()

,但它没有(同样的异常仍然存在)。

接下来,我尝试手动运行upgrade测试,使用:
async def test_migrations(alembic_runner):
alembic_runner.migrate_up_to("revision_tag_here")

得到

alembic_runner.migrate_up_to("revision_tag_here"

venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264: in run_connection_task返回asyncio.run(运行(引擎))

RuntimeError: asyncio.run()不能从正在运行的事件循环中调用

然而这是一个内部调用通过pytest-alembic,我自己不调用asyncio.run(),所以我不能为此应用任何在线修复(try-catch检查是否有现有的事件循环使用等)。我确信这个和我自己的没有关系asyncio.run()alembic env中定义,因为如果我添加一个断点-或者只是在它上面引发一个异常-这行实际上是从未执行

最后,我也尝试了nest-asyncio.apply(),它只是永远挂起。

还有一些博客文章建议使用这个fixture初始化用于测试的数据库表:

async with engine.begin() as connection:
await connection.run_sync(Base.metadata.create_all)

的工作的目的是创建一个数据库来运行测试,但这并没有通过迁移运行,所以这对我的情况没有帮助。

我觉得我已经尝试了所有的方法&我访问了每个文档页面,但到目前为止还没有找到。运行异步迁移测试肯定不会这么困难吧?

如果需要任何额外的信息,我很乐意提供。

我用下面的

很容易地设置并运行了它env.py—这里的主要思想是迁移可以同步运行

import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = mymodel.Base.metadata

def run_migrations_online():
connectable = context.config.attributes.get("connection", None)
if connectable is None:
connectable = AsyncEngine(
engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True
)
)
if isinstance(connectable, AsyncEngine):
asyncio.run(run_async_migrations(connectable))
else:
do_run_migrations(connectable)

async def run_async_migrations(connectable):
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()

def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()

run_migrations_online()
然后我添加了一个简单的db初始化脚本
init_db.py
from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine
__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"
cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)

async def migrate_db(conn_url: str):
async_engine = create_async_engine(conn_url, echo=True)
async with async_engine.begin() as conn:
await conn.run_sync(__execute_upgrade)

def __execute_upgrade(connection):
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")

那么您的pytest fixture可以看起来像这样
conftest.py

...
@pytest_asyncio.fixture(autouse=True)
async def migrate():
await migrate_db(conn_url)
yield
...

注意:我不把我的迁移装置限定在测试会话中,我倾向于在每次测试后放弃和迁移。

现在Alembic提供了一个async模板,参见https://alembic.sqlalchemy.org/en/latest/cookbook.html#using-asyncio-with-alembic。

您可以使用以下命令生成异步env.py:

alembic init -t async <script_directory_here>

但是继续阅读https://alembic.sqlalchemy.org/en/latest/cookbook.html#programmatic-api-use-connection-sharing-with-asyncio,它建议将run_migrations_online更改为以下内容:

def run_migrations_online():
"""Run migrations in 'online' mode."""
connectable = config.attributes.get("connection", None)
if connectable is None:
asyncio.run(run_async_migrations())
else:
do_run_migrations(connectable)

相关内容

  • 没有找到相关文章

最新更新