每个运营商的气流DAG娱乐



我们正在使用Airflow 2.1.4并在Kubernetes上运行。

我们已经为web服务器、调度器分离了pod,并且我们正在使用Kubernetes执行器

我们正在使用各种运算符,如PythonOperatorKubernetesPodOperator等。

我们的设置可处理约2K个客户(企业(,每个客户都有自己的DAG。

我们的代码看起来像:

def get_customers():
logger.info("querying database to get all customers")
return sql_connection.query(SELECT id, name, offset FROM customers_table)

customers = get_customers()
for id, name, offset in customers:
dag = DAG(
dag_id=f"{id}-{name}",
schedule_interval=offset,
)
with dag:
first = PythonOperator(..)
second = KubernetesPodOperator(..)
third = SimpleHttpOperator(..)
first >> second >> third
globals()[id] = dag

在上面的代码片段中,我们得到了一个简化版本,但DAG中有几十个运算符(而不仅仅是三个(。

问题是,对于每个DAG中的每个操作符,我们都会看到querying database to get all customers日志,这意味着我们查询数据库的方式比我们想要的要多

数据库不经常更新,我们每天只能更新两次DAG。我知道DAG被保存在元数据数据库或其他什么中。。

  • 有没有一种方法可以只通过调度器一次构建这些DAG,而不是每个操作员都这样做
  • 我们是否应该更改设计以支持我们的多租户需求?还有比这更好的选择吗

在我们的案例中,约60名操作员X约2000名客户=约120000次数据库查询。

是的,这完全是意料之中的事。DAG由Airflow定期解析(默认为30秒(,因此任何顶层代码(在解析文件期间执行的代码,而不是运算符的"执行"方法(都会在那时执行。

简单的答案(以及最佳实践(是";不要在DAG的顶级代码中使用任何繁重的操作";。特别是不要使用DB查询。但是,如果你想要一些更具体的答案和可能的处理方法,在关于最佳实践的Airflow文档中有专门的章节:

  • 这就是为什么顶级代码应该是";"轻";https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-级别python代码

  • 这是关于你可能使用的策略来避免";"重";当您像在您的案例中那样执行动态DAG生成时,顶级代码中的操作:https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#dynamic-dag代

简而言之,有三种建议的方法:

  1. 使用env变量
  2. 通过外部脚本自动(定期(从数据库中生成一个配置文件(例如.json(,并将其放在DAG旁边,然后由DAG从那里读取json文件,而不是使用sql查询
  3. 动态生成许多DAG-python文件(例如使用JINJA的示例(也会使用外部脚本自动并定期生成

我相信你可以用2(或3(来实现你的目标。

最新更新