Airflow 无法从 initdb 上的自定义数据模型创建表



我正在开发一个自定义气流插件,它应该显示元数据数据库中的(自定义(表

class CampaignModel(Base, LoggingMixin):
    __tablename__ = "campaigns"
    id = Column(Integer, primary_key=True)
    campaign_name = Column(String(ID_LEN), unique=True)
class CampaignsView(ModelView):
    page_size = 50
v = CampaignsView(CampaignModel, settings.Session(), name="Campaigns")
class CampaignEditorPlugin(AirflowPlugin):
    name = "campaigns_plugin"
    ...
    admin_views = [v]
    ...

气流正确启动并加载插件。但它不会在airflow initdbairflow resetdb上以CampaignModel创建/注册我的新表。

如何正确注册表,以便 SQLAlchemy 在 initdb 上创建它?

您可以使用 a( Alembic 迁移或 b( 向 Airflow db 添加一个表。SQLAlchemy ORM.

a( 使用 Alembic 进行迁移

将代码添加到气流存储库。

# 1. Create a migration file
cd airflow_dir/airflow/
alembic revision -m "create new_table table"
# 2. Add upgrade and downgrade instructions to the generated migration Python file.
## airflow_dir/airflow/migrations/versions/0a2a5b66e19d_add_new_table_table.py
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'xxxxxxxxx'
down_revision = 'xxxxxxxxx'
branch_labels = None
depends_on = None

def upgrade():
    op.create_table('new_table',
        sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
        sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False)
def downgrade():
    op.drop_table('new_table')

# 3. Run the migration
alembic upgrade head

b( SQLAlchemy ORM

可以在与气流存储库分开的实例目录上的气流插件中使用。

from airflow import settings
from sqlalchemy import Column, Index, String
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session
class NewModel(Base, LoggingMixin):
    __tablename__ = "new_table"
    id = Column(String(ID_LEN), primary_key=True)
    name = Column(String(ID_LEN), unique=True)

# Create new table in Airflow db
NewModel.__table__.create(settings.engine, checkfirst=True)
# Create object
new_object = NewModel(id='1', name='my name')
# Write to Airflow db
with create_session() as session:
    session.add(new_object)
    session.commit()

虽然不是通过initdbresetdb但在定义类后添加CampaignModel.__table__.create(settings.engine, checkfirst=True)会导致在加载模块时创建表(如果它尚不存在(。

最新更新