我有一个DB表,它看起来像:
name last_successful
dataset1 2020-11-07 10:30
dataset2 2020-11-07 10:30
现在我想在气流中安排一个工作流程,它执行以下操作:
- 从数据库中读取所有数据集
name
和last_successful
- 对于每个数据集,检查对应于下一个30分钟bucket
last_successful + 30 min
的数据是否完成(假设这是一个黑匣子( - 在s3数据分区中写入一个_SUCCESS文件,对应于步骤2中找到的任何已完成的bucket,并更新数据库中的
last_successful
由于数据集彼此独立,因此可以对每个数据集并行执行步骤2+3。如何在Airflow中执行此操作?我不想为每个数据集创建单独的任务,因为数据集的列表将不断增加。
总结对评论的讨论。
创建一个DAG作为解析DB中读取记录的一部分是一种糟糕的做法。Airflow不断解析文件,并打开与数据库的连接以获取最新记录。这意味着数据库上的连接负载很大。出于这个原因,Airflow将为尝试使用这种方法的用户添加警告(请参阅问题(。当Airflow元存储后端出现警告时,任何其他DB也会出现同样的情况。
您可以通过在文件中列出数据集名称来处理您的用例。如果你将创建一个具有动态任务的DAG;分支";文件中的任何新条目。然后,您可以在一个专用的操作员中将条目与DB进行比较,以确保它是有效的。
这种方法可能类似于:
def get_file():
with open('your_file') as f:
lines = f.read().splitlines()
return lines
file_list = get_file()
with DAG(dag_id='my_dag',...
) as dag:
start_op = DummyOperator(task_id='start_task')
for dataset in file_list:
my_op = MyOperator(task_id=dataset)
start_op >> my_op
这样,每次您将新的数据集添加到文件中时,它都会自动使用MyOperator
为其添加分支