Dag,它可以读取CSV行作为操作员的输入



我有一个csv文件,其中包含各种列的数据,这些数据可用于PythonOperators调用的Python函数。我的dags管道的设置方式是,我想读取每行的CSV,并将这些输入输入到我的运算符中。但是,我如何在csv行中迭代我的dag?

如果您想读取csv文件,并在任务中单独处理每一行,您可以读取csv并使用动态任务映射(自2.3.0起可用(来处理行

with DAG(dag_id="dag id", start_date=...) as dag:
@task
def read_csv():
# here load the csv file and prepare the data to process
csv_file = ... # read csv_file
data_process = ... # a list of data calculated from the csv_file

return data_process # ex: [{"row":1, "x":1}, {"row":2, "x":1}, {"row":3, "x":2}]

@task
def processing(data_to_process):
# implement your processing function
print(f"row data: {data_to_process}")
data_to_process = read_csv()
processing.expand(data_to_process=data_to_process)

最新更新