是否有方法为非硬编码列表中的每个项目执行气流运算符?对于动态计算列表中的每个项目(@task
的结果(,都希望运行一个docker映像,将该项目作为环境变量传入:
for item in list_fetched_from_a_task:
run docker container passing in the item as a environment variable
这个DAG将打印sql查询的列表结果中的每个项目:
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
@task
def get_all_items():
mssqlServer = MsSqlHook(mssql_conn_id="my_mssql")
raw_item_names = mssqlServer.get_records(
"""
use database_name;
select itemName from database_name.fds.items;
"""
)
item_names = []
for item_name in raw_item_names:
item_names.append(item_name[0])
return item_names # ["Item 1", "Item 2", "Item 3"]
@task
def do_thing_for_one_item(single_item):
print("Doing thing for item: " + single_item)
with DAG(dag_id="do-thing-for-each-item-process", start_date=datetime(2022, 10, 29)) as dag:
do_thing_for_one_item.expand(single_item=get_all_items())
但是,如果您想运行DockerOperator
来为该列表中的每个项目运行图像,该怎么办?是否将single_item
项作为环境变量传递给图像?类似于气流操作员的等效.expand(List)
方法,该方法将为所提供列表中的每个项目运行操作员?
类似这样的东西(伪代码(:
for single_item in list_fetched_from_task:
do DockerOperator(
task_id='run-some-image-for-single-item',
image='some-image:latest',
docker_url='unix:///var/run/docker.sock',
network_mode='host',
environment={
"SINGLE_item": single_item
},
dag=dag
)
将DockerOperator
封装在PythonOperator
中,并传入创建列表的任务的结果(XCom(:
@task()
def run_docker_container(context, list_fetched_from_task):
for single_item in list_fetched_from_task:
DockerOperator(
task_id='run-some-image-for-single-item',
image='some-image:latest',
docker_url='unix:///var/run/docker.sock',
network_mode='host',
environment={
"SINGLE_item": single_item
},
).execute(context=context)
你不会在UI中渲染它,因为Airflow Web服务器不可能知道它将提前运行多少任务。