我正在运行这个DAG。它从dash_workers.py
导入函数(尚未包含 - 这会有帮助吗?(并将这些函数实现为PythonOperator
定义的任务。我正在使用气流版本 1.8.0:
from datetime import datetime, timedelta
import os
import sys
import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 7, 18),
'schedule_interval': None
}
DAG = af_models.DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in ids:
print('Building transactions for {}'.format(uid))
upload_transactions = PythonOperator(
task_id='upload_transactions',
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_upstream(get_id_creds)
这导致:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 263, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth on3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 675, in _load
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in <module>
upload_transactions.set_upstream(get_id_creds)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2478, in set_upstream
self._set_relatives(task_or_task_list, upstream=True)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2458, in _set_relatives
task.append_only_new(task._downstream_task_ids, self.task_id)
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 2419, in append_only_new
''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 573, in test
dag = dag or get_dag(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 126, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: dash_preproc. Either the dag did not exist or it failed to parse.
这里的应用程序是我使用函数get_id_creds
从 SQL 表中提取 ID 列表,然后基于每个 ID 生成详细的数据配置文件。这两个函数都在内部使用MySqlHook
,我已经独立测试了每个函数/任务,以确保它们单独产生预期的行为(它们确实如此(。
错误的症结似乎是线路airflow.exceptions.AirflowException: Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions already registered
。这似乎表明,在第一次通过循环时,任务被"注册",然后在第二次传递时,解析器抱怨它已经完成了该操作。此示例脚本使执行我在此处执行的操作看起来很容易:只需将下游任务嵌入for
循环中即可。不知道为什么这会失败。
我设置了与LocalExecutor
的本地并行性。我的理解是,如果我能做到这一点,我就可以在同一台机器上并行运行多个数据配置文件生成作业。
此错误从何而来,如何使此脚本正常工作?
与您的问题没有直接关系,但您不需要导入airflow.models
在您的情况下只需执行from airflow.models import DAG
并进行必要的更改即可。
您指出了一个示例,该示例显示了动态生成任务PythonOperator
DAG,但您似乎不太理解它。
在您的情况下,您必须动态分配任务名称,以便每个新任务都可以注册并显示在 Web 服务器中。
for idx, uid in enumerate(ids):
print('Building transactions for {}'.format(uid))
upload_transactions = PythonOperator(
task_id='upload_transactions_'+str(idx),
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
通过在任务名称中添加当前uid
的索引,每个任务都将获得一个唯一的名称。我没有为此使用uid
,因为我不知道每个元素在您的列表中是否是唯一的。如果是这样,您可以删除enumerate()
并使用uid
.
我希望这会有所帮助。 干杯!