气流:在TaskGroup中创建动态任务时出现问题



我正在尝试创建一个动态工作流。

我得到了这个坏的DAG错误重复的任务id

Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 430, in __init__
task_group.add(self)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py", line 140, in add
raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
airflow.exceptions.DuplicateTaskIdFound: Task id 'Review.extract__1' has already been added to the DAG

我的代码:

@task
def extract(filename):
some_extract_function
@task
def transform(item :list):
some_transform_function
with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
for filename in filtered_filenames:
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)

当我尝试在TaskGroup中动态创建任务时,问题就出现了。如果我去掉TaskGroup,效果很好。

我在这里发现了这个问题https://github.com/apache/airflow/issues/8057。有什么方法可以纠正这个错误吗?比如动态创建自定义task_id?我知道使用PythonOperator是可能的。但我尝试使用TaskFlow API来实现这一点。

感谢

修复多亏了这个视频这里的

所以我通过在TaskGroup中动态创建TaskGroup来解决这个问题。

这是代码

with TaskGroup('Review') as Review:
data = []
filenames = os.listdir(DATA_PATH)
filtered_filenames = list(filter(lambda x: re.match(r"(^review)", x), filenames))
for filename in filtered_filenames:
with TaskGroup(filename):
extract_review = extract(filename)
data.append(extract_review)
transformed_data_review = transform(data)

最新更新