使用sqlalchemy执行insert操作后,无法从数据库中获取id



假设这个模型:

class Pipeline(Base):
__tablename__ = "pipeline"
pipeline_id = Column(String, primary_key=True)
pipeline_version_id = Column(Integer, primary_key=True, autoincrement=True)
is_active = Column(Boolean, nullable=False)
update_ts = Column(DateTime, nullable=False)
config = Column(TEXT(), nullable=True)
parser_version = Column(String, nullable=True)

插入代码:

async with self.sessionFactory() as session:
async with session.begin():
query = select(Pipeline).filter_by(pipeline_id=pipeline_id).filter_by(is_active=True)
result = await session.execute(query)
active_pipelines = result.scalars().all()
for entry in active_pipelines:
entry.is_active = False
self.logger.info(f"Marking pipeline/version {entry.pipeline_id}/{entry.pipeline_version_id} inactive")
self.logger.info(f"Creating update_pipeline:  {updated_pipeline}")
session.add(updated_pipeline)
pipeline_version_id = updated_pipeline.pipeline_version_id
self.logger.info('adding updated pipeline with version id {id}'.format(id=str(pipeline_version_id)))

当我运行这个时,它返回一个nullpipeline_version_id

[编辑]

以下是我尝试过的其他方法:

async with self.sessionFactory() as session:
async with session.begin():
query = select(Pipeline).filter_by(pipeline_id=pipeline_id).filter_by(is_active=True)
result = await session.execute(query)
active_pipelines = result.scalars().all()
for entry in active_pipelines:
entry.is_active = False
self.logger.info(f"Marking pipeline/version {entry.pipeline_id}/{entry.pipeline_version_id} inactive")
self.logger.info(f"Creating update_pipeline:  {updated_pipeline}")
session.add(updated_pipeline)
pipeline_version_id = updated_pipeline.pipeline_version_id
self.logger.info('adding updated pipeline with version id {id}'.format(id=str(pipeline_version_id)))

Result:Failed to update DB. Error: Instance <Pipeline at 0x110c31da0> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)

async with self.sessionFactory() as session:
async with session.begin():
query = select(Pipeline).filter_by(pipeline_id=pipeline_id).filter_by(is_active=True)
result = await session.execute(query)
active_pipelines = result.scalars().all()
for entry in active_pipelines:
entry.is_active = False
self.logger.info(f"Marking pipeline/version {entry.pipeline_id}/{entry.pipeline_version_id} inactive")
self.logger.info(f"Creating update_pipeline:  {updated_pipeline}")
session.add(updated_pipeline)
pipeline_version_id = updated_pipeline.pipeline_version_id
self.logger.info('adding updated pipeline with version id {id}'.format(id=str(pipeline_version_id)))

结果:Failed to update DB. Error: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/14/xd2s)

我如何得到插入记录的ID而不做另一个查询?

这些是我的依赖项:

python = "^3.7"
setuptools = "<58"
fastapi = "^0.54.1"
gunicorn = "<20.0"
uvicorn = "^0.11.5"
sqlalchemy = "^1.4.25"
apache-airflow="1.10.10"
psycopg2-binary = "^2.9.1"

这是我的会话创建:

def create_session_factory(db_conn=None):
#
# Using https://rogulski.it/blog/sqlalchemy-14-async-orm-with-fastapi/
#
if not db_conn:
db_conn = get_default_parser_db_conn_str()
global _engine
if not _engine:
_engine = create_async_engine(db_conn, **DB_ENGINE_OPTIONS)
sessionFactory = sessionmaker(_engine, class_=AsyncSession)
sessionFactory.configure()
return sessionFactory

和DB_ENGINE_OPTIONS

DB_ENGINE_OPTIONS = {
"poolclass": QueuePool,
"pool_size": 10,
"max_overflow": 50,
"pool_recycle": 3600,
"pool_timeout": 30
}

感谢您的帮助。

这是我第一次使用python进行异步DB调用,所以我确信我错过了一些东西。

谢谢大家的回答。

最终,在调用session.add(obj)之后添加一个session.flush()给了我所需要的。

相关内容

  • 没有找到相关文章

最新更新