也许这个问题的措辞不是最好的。基本上我想做的是:构建一个DAG,迭代sql文件列表,并使用BigQueryOperator()来执行这些sql文件。
但是,在BQ中不存在相应的表的sql文件中会有。。而我想捕获这些错误(它们不应该被打印在日志中,任务也不应该被标记为失败。但是,错误应该添加到字典中,并在最后由运行的另一个任务显示。
从下面的代码可以看出,我尝试使用try &除了然而,这并不奏效。任务get的正确执行,它运行sql的良好,但如果有一个错误,它会立即打印出错误在日志中,并将任务标记为失败和的try &except子句被完全忽略。最后一个任务print_errors()也不打印任何东西,因为字典是空的。所以对我来说,如果我不能影响气流运算符一旦它被调用,因为它忽略了围绕在运算符
周围的python逻辑我当前的代码如下所示:
导入一些库:
import airflow
from airflow import models
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
获取一些变量+ sql文件列表(现在硬编码,但稍后将从GCP存储中获取)
BQ_PROJECT = models.Variable.get('bq_datahub_project_id').strip()
BQ_DATASET_PREFIX = models.Variable.get('datasetPrefix').strip()
AIRFLOW_BUCKET = models.Variable.get('airflow_bucket').strip()
CODE = models.Variable.get('gcs_code').strip()
COMPOSER = '-'.join(models.Variable.get('airflow_bucket').strip().split('-')[2:-2])
DAG
with models.DAG(dag_id='run-sql-files',
schedule_interval='0 */3 * * *',
user_defined_macros={"COMPOSER": COMPOSER},
default_args=default_dag_args,
concurrency=2,
max_active_runs=1,
catchup=False) as dag:
def print_errors():
if other_errors:
for error, files in other_errors.items():
print("Warning: " + error + " for the following SQL files:")
for file in files:
print(file)
t0 = DummyOperator(
task_id='Start'
)
t2 = DummyOperator(
task_id='End',
trigger_rule='all_done'
)
other_errors = {}
for i, sql_file in enumerate(sql_files):
try:
full_path_sql = AIRFLOW_BUCKET + sql_file
t1 = BigQueryOperator(
task_id='sql_'+str(i),
params={"datahubProject": BQ_PROJECT, "datasetPrefix": BQ_DATASET_PREFIX,},
sql=sql_file,
use_legacy_sql=False,
location='europe-west3',
labels={ "composer_id": COMPOSER, "dag_id": "{{ dag.dag_id }}", "task_id": "{{ task.task_id }}"},
dag=dag
)
t0 >> t1 >> t2
except Exception as e:
other_errors[str(e)] = other_errors.get(str(e), []) + [sql_file]
t3 = PythonOperator(
task_id='print_errors',
python_callable=print_errors,
provide_context=True,
dag=dag)
t2 >> t3
为了解决您的问题,我建议您使用PythonOperator
与BigQuery
Python
客户端,以便有更大的灵活性和更容易捕获错误:
def execute_queries():
client = bigquery.Client()
for i, sql_file in enumerate(sql_files):
# check if you have to recovers the string sql query from the current sql file
query = sql_file
query_job = client.query(query)
try:
job.result()
except Exception as e:
for e in job.errors:
# apply your logic
logging.error('ERROR: {}'.format(e['message']))
with models.DAG(dag_id='run-sql-files',
schedule_interval='0 */3 * * *',
user_defined_macros={"COMPOSER": COMPOSER},
default_args=default_dag_args,
concurrency=2,
max_active_runs=1,
catchup=False) as dag:
execute_queries_task = PythonOperator(
task_id="task",
python_callable=execute_queries
)
execute_queries_task