我正在开发一个自定义气流插件,它应该显示元数据数据库中的(自定义(表
class CampaignModel(Base, LoggingMixin):
__tablename__ = "campaigns"
id = Column(Integer, primary_key=True)
campaign_name = Column(String(ID_LEN), unique=True)
class CampaignsView(ModelView):
page_size = 50
v = CampaignsView(CampaignModel, settings.Session(), name="Campaigns")
class CampaignEditorPlugin(AirflowPlugin):
name = "campaigns_plugin"
...
admin_views = [v]
...
气流正确启动并加载插件。但它不会在airflow initdb
或airflow resetdb
上以CampaignModel
创建/注册我的新表。
如何正确注册表,以便 SQLAlchemy 在 initdb 上创建它?
您可以使用 a( Alembic 迁移或 b( 向 Airflow db 添加一个表。SQLAlchemy ORM.
a( 使用 Alembic 进行迁移
将代码添加到气流存储库。
# 1. Create a migration file
cd airflow_dir/airflow/
alembic revision -m "create new_table table"
# 2. Add upgrade and downgrade instructions to the generated migration Python file.
## airflow_dir/airflow/migrations/versions/0a2a5b66e19d_add_new_table_table.py
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'xxxxxxxxx'
down_revision = 'xxxxxxxxx'
branch_labels = None
depends_on = None
def upgrade():
op.create_table('new_table',
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False)
def downgrade():
op.drop_table('new_table')
# 3. Run the migration
alembic upgrade head
b( SQLAlchemy ORM
可以在与气流存储库分开的实例目录上的气流插件中使用。
from airflow import settings
from sqlalchemy import Column, Index, String
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session
class NewModel(Base, LoggingMixin):
__tablename__ = "new_table"
id = Column(String(ID_LEN), primary_key=True)
name = Column(String(ID_LEN), unique=True)
# Create new table in Airflow db
NewModel.__table__.create(settings.engine, checkfirst=True)
# Create object
new_object = NewModel(id='1', name='my name')
# Write to Airflow db
with create_session() as session:
session.add(new_object)
session.commit()
虽然不是通过initdb
或resetdb
但在定义类后添加CampaignModel.__table__.create(settings.engine, checkfirst=True)
会导致在加载模块时创建表(如果它尚不存在(。