使用XCOM中任务的JSON表示在Airflow中运行时/动态生成任务



我正在评估像Airflow这样的框架,该框架能够在运行工作流时在运行时构建任务的动态图,即在启动工作流之前,我不知道具体的任务及其依赖关系,我只知道图的级别数。

我开始玩Airflow,我使用XCom来保持图的状态,如下所述:在气流中创建动态工作流的正确方法

我还通过在XCom行中存储包含任务依赖关系描述的JSON片段来扩展这种方法,例如:

{
"key": "first_file",
"tasks" :
[
{
"task_id" : "third_task",
"dependencies" : ["first_task", "second_task"]
}
]
}

请注意,我真的不需要重新运行我的DAG:我的DAG是由外部调度的,一旦我的第一个DagRun完成,之后就不会删除/删除/修改任何新任务。如果需要重新运行,我会创建一个新的DAG。

我使用的技术如下:我用两个任务创建DAG,一个是传感器(这确保DagRun始终处于运行状态直到结束(

class WaitTasksSensor(BaseSensorOperator):
...
def poke(self, context):
files = os.list_dir(MY_DIR)
for f in files:
filename = os.path.join(MY_DIR, f)
json_file = open(filename).read()
json_dict = json.loads(json_file)
key = json_dict["key"]
self.xcom_push(context, key + "_" + self.dag_id, json_file)
# This sensor completes successfully only when the "end" task appears in the graph
last_task_id = "end_" + self.dag_id
return last_task_id in self.dag.task_ids

def create_dags(dag_id):
with DAG(dag_id, schedule_interval=None):
first = DummyOperator(task_id="first_" + dag_id)
wait_sensor = WaitTasksSensor(task_id="wait_sensor_" + dag_id, mode="reschedule")
first >> wait_sensor
pull_tasks(wait_sensor) # Code below
dag = create_dags("dag_1")

当Sensor推送表示任务及其依赖关系的JSON文件(这些文件一直在文件夹中(时,我尝试在DAG代码中从XCom中提取任务。

def pull_tasks(previous_task):
current_dag = previous_task.dag
dag_id = current_dag.dag_id
last_run = current_dag.get_last_dagrun(include_externally_triggered=True)

if not last_run:
return
last_run_date = last_run.execution_date
task_instances = previous_task.get_task_instances(start_date=last_run_date)
if not task_instances:
return
last_task_instance = task_instance[-1]
json_ids = [...]
for json_id in json_ids:
task_graph_json = last_task_instance.xcom_pull(task_ids=previous_task.task_id,
key=json_id + "_" + dag_id,
dag_id=dag_id)

if task_graph:
task_graph_deserialized = json.loads(task_graph_json)
tasks = task_graph_deserialized["tasks"]
create_dynamic_tasks(dag, task_dicts)
def create_dynamic_tasks(dag, task_dicts):
dag_id = dag.dag_id
for task_dict in task_dicts:
task = DummyOperator(task_id=task_id + "_" + dag_id,
dag=dag)
dependencies = task_dict["dependencies"]

for predecessor_id in dependencies:
predecessor = dag.get_task(predecessor_id + "_" + dag_id)
predecessor >> task

我的问题是:对于这样的用例,Airflow是否是有效的工具?还是我将其与主要用例(即,具有静态任务的固定工作流不是在运行时生成的(拉伸得太远了?

比方说,这种方法可以扩展到数万个DAG和数十万个任务吗?或者有没有其他类似的工具可以更容易地实现这一点?

您的问题似乎与此问题相似。在那里的答案中,如果你真的必须使用Airflow,我建议一个丑陋的解决方案。

然而,您的问题的答案是:我建议您查看Argo工作流。由于它完全在Kubernetes上运行,因此它也非常容易扩展。

最新更新