气流插件 - 创建自己的模型并使用气流元数据库来存储插件特定数据



我正在尝试编写一个气流插件,该插件添加了菜单按钮,新视图和api端点。此外,我需要在数据库表中读取和存储一些数据,当 airflow 拾取它时,插件应该创建这些数据。

当我有 2 个文件时出现问题,让我们将它们命名为 view.pymymodel.py。 在 view.py 中定义了我的整个视图,菜单按钮等。

mymodel.py 中,只有sqlalchemy模型(只是相关的行和剥离的表定义 - 见下文)

mymodel.py

from airflow.models.base import Base
class someDataBase(Base):
"""
Create Model someDataBase
"""
print('create table someDataBaseTableName')
__tablename__ = "someDataBaseTableName"
id = Column(Integer, primary_key=True)

在 view.py中,我导入了文件:

from someplugin.models.mymodel import someDataBase

文件夹结构:

├── plugins
│   └── someplugin
│       ├── __init__.py
│       ├── models
│       │   ├── mymodel.py
│       └── views
│           ├── __init__.py
│           └── view.py

这将引发以下错误消息:

{{plugins_manager.py:146}} 错误 - 表"someDataBaseTableName"已为此元数据实例定义。 指定"extend_existing=True"以重新定义现有 Table 对象上的选项和列。

(顺便说一句:extend_existing=真不能解决问题:()

我认为问题是气流通过plugin_manager加载插件,并且导入语句再次加载 someDataBase 类。我认为如果我在某个数据库类中添加一些日志记录(让我们将其命名为"mylogging" - 见下文),可以确认这种行为。

airflow  | create table someDataBaseTableName
airflow  | create table someDataBaseTableName

有点修复,但不是一个令人满意的修复:如果我直接在 view.py 中定义我的 someDataBase 模型,它就可以工作。

非常欢迎有关如何解决此问题的任何建议。谢谢

我正在复制我在sqlalchemy.exc.InvalidRequestError中给出的答案:表已经定义,以防有人在使用气流时绊倒这个问题。

我遇到了同样的问题,结果models.py被导入了两次,__name__不同。

要调试它,请放置一个

import sys
from pprint import pprint
pprint(list(sys.modules.keys())) # This line prints all imported modules

在相关位置并查找所有models实例。就我而言,我有类似的东西

[...,
'models',
...,
'package_name.models']

因此,在package_name/models.py中,我用

if __name__ != 'models':
# original models.py code #

它解决了重复的负载。

我认为有一种更pythonic的方法来解决问题,它与告诉flask不要models.py视为模块或脚本(不确定哪个是正确的)在某处的__init__.py文件中有关,但即使完成了所有阅读,我仍然对这些整个恶作剧的工作原理感到非常困惑。

尝试改用包flask_appbuilder基类。

from airflow.models.base import Base替换为from flask_appbuilder import Base

它帮助我解决了这个问题。

最新更新