气流-任务中的并行执行



我有一个DB表,它看起来像:

name        last_successful
dataset1    2020-11-07 10:30
dataset2    2020-11-07 10:30

现在我想在气流中安排一个工作流程,它执行以下操作:

  1. 从数据库中读取所有数据集namelast_successful
  2. 对于每个数据集,检查对应于下一个30分钟bucketlast_successful + 30 min的数据是否完成(假设这是一个黑匣子(
  3. 在s3数据分区中写入一个_SUCCESS文件,对应于步骤2中找到的任何已完成的bucket,并更新数据库中的last_successful

由于数据集彼此独立,因此可以对每个数据集并行执行步骤2+3。如何在Airflow中执行此操作?我不想为每个数据集创建单独的任务,因为数据集的列表将不断增加。

总结对评论的讨论。

创建一个DAG作为解析DB中读取记录的一部分是一种糟糕的做法。Airflow不断解析文件,并打开与数据库的连接以获取最新记录。这意味着数据库上的连接负载很大。出于这个原因,Airflow将为尝试使用这种方法的用户添加警告(请参阅问题(。当Airflow元存储后端出现警告时,任何其他DB也会出现同样的情况。

您可以通过在文件中列出数据集名称来处理您的用例。如果你将创建一个具有动态任务的DAG;分支";文件中的任何新条目。然后,您可以在一个专用的操作员中将条目与DB进行比较,以确保它是有效的。

这种方法可能类似于:

def get_file():
with open('your_file') as f:
lines = f.read().splitlines()
return lines
file_list = get_file()
with DAG(dag_id='my_dag',...
) as dag:
start_op = DummyOperator(task_id='start_task')
for dataset in file_list:
my_op = MyOperator(task_id=dataset)
start_op >> my_op

这样,每次您将新的数据集添加到文件中时,它都会自动使用MyOperator为其添加分支

相关内容

  • 没有找到相关文章

最新更新