气流如何对动态计算列表中的每个项目执行操作员操作



是否有方法为非硬编码列表中的每个项目执行气流运算符?对于动态计算列表中的每个项目(@task的结果(,都希望运行一个docker映像,将该项目作为环境变量传入:

for item in list_fetched_from_a_task:
run docker container passing in the item as a environment variable

这个DAG将打印sql查询的列表结果中的每个项目:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook

@task
def get_all_items():
mssqlServer = MsSqlHook(mssql_conn_id="my_mssql")
raw_item_names = mssqlServer.get_records(
"""
use database_name;
select itemName from database_name.fds.items;
"""
)
item_names = []
for item_name in raw_item_names:
item_names.append(item_name[0])

return item_names # ["Item 1", "Item 2", "Item 3"]
@task
def do_thing_for_one_item(single_item):
print("Doing thing for item: " + single_item)

with DAG(dag_id="do-thing-for-each-item-process", start_date=datetime(2022, 10, 29)) as dag:
do_thing_for_one_item.expand(single_item=get_all_items())

但是,如果您想运行DockerOperator来为该列表中的每个项目运行图像,该怎么办?是否将single_item项作为环境变量传递给图像?类似于气流操作员的等效.expand(List)方法,该方法将为所提供列表中的每个项目运行操作员?

类似这样的东西(伪代码(:

for single_item in list_fetched_from_task:
do DockerOperator(
task_id='run-some-image-for-single-item',
image='some-image:latest',
docker_url='unix:///var/run/docker.sock',
network_mode='host',
environment={
"SINGLE_item": single_item
},
dag=dag
)

DockerOperator封装在PythonOperator中,并传入创建列表的任务的结果(XCom(:

@task()
def run_docker_container(context, list_fetched_from_task):
for single_item in list_fetched_from_task:
DockerOperator(
task_id='run-some-image-for-single-item',
image='some-image:latest',
docker_url='unix:///var/run/docker.sock',
network_mode='host',
environment={
"SINGLE_item": single_item
},
).execute(context=context)

你不会在UI中渲染它,因为Airflow Web服务器不可能知道它将提前运行多少任务。

最新更新