Airflow taskinstance将任务标记为成功,但local_task_job表示任务已退出,返回代码为1



我的问题:

我正在使用Docker运行Airflow。当我在DAG中运行任务时,日志显示它被标记为成功,但也显示任务已退出,返回代码为1

我尝试过的:

  • 我尝试增加分配给容器的内存量
  • 我从DAG中删除了关于失败逻辑的电子邮件

我的DAG


#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
# [START tutorial]
# [START import_module]
from datetime import timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow'
#'depends_on_past': False,
#'email': ['courtney.poles@alteryx.com'],
#'email_on_failure': False,
#'email_on_retry': False,
#'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]
# [START instantiate_dag]
with DAG(
'baton',
default_args=default_args,
description='A sample Airflow DAG that runs one in and one out of process job.',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['baton_example'],
) as dag:
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='in_process_job',
bash_command='python /opt/airflow/dags/scripts/baton_in_process.py',
#retries=1
)
#t2 = BashOperator(
#task_id='oop_job',
#bash_command=r'python optairflowscriptstest.py',
#retries=1,
#)
t1 #>> t2
# [END basic_task]
# [START documentation]
# t1.doc_md = dedent(
#     """
# #### Task Documentation
# You can document your task using the attributes `doc_md` (markdown),
# `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
# rendered in the UI's Task Instance Details page.
# ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
#
# """
# )
#
# dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
# dag.doc_md = """
# This is a documentation placed anywhere
# """  # otherwise, type it like this
# [END documentation]
# [START jinja_template]
# templated_command = dedent(
#     """
# {% for i in range(5) %}
#     echo "{{ ds }}"
#     echo "{{ macros.ds_add(ds, 7)}}"
#     echo "{{ params.my_param }}"
# {% endfor %}
# """
# )
# t3 = BashOperator(
#     task_id='templated',
#     depends_on_past=False,
#     bash_command=templated_command,
#     params={'my_param': 'Parameter I passed in'},
# )
# [END jinja_template]

# [END tutorial]

我的日志文件

[2021-07-07 18:59:44,061] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: baton.in_process_job 2021-07-07T18:59:40.183659+00:00 [None]>
[2021-07-07 18:59:44,069] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: baton.in_process_job 2021-07-07T18:59:40.183659+00:00 [None]>
[2021-07-07 18:59:44,069] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2021-07-07 18:59:44,070] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-07-07 18:59:44,074] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-07-07 18:59:44,090] {taskinstance.py:1087} INFO - Executing <Task(BashOperator): in_process_job> on 2021-07-07T18:59:40.183659+00:00
[2021-07-07 18:59:44,096] {standard_task_runner.py:52} INFO - Started process 1302 to run task
[2021-07-07 18:59:44,099] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'baton', 'in_process_job', '2021-07-07T18:59:40.183659+00:00', '--job-id', '139', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/baton_dag.py', '--cfg-path', '/tmp/tmpmew1_wqd', '--error-file', '/tmp/tmp9qfruch0']
[2021-07-07 18:59:44,100] {standard_task_runner.py:77} INFO - Job 139: Subtask in_process_job
[2021-07-07 18:59:44,135] {logging_mixin.py:104} INFO - Running <TaskInstance: baton.in_process_job 2021-07-07T18:59:40.183659+00:00 [running]> on host 665b4e46f1a2
[2021-07-07 18:59:44,177] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=baton
AIRFLOW_CTX_TASK_ID=in_process_job
AIRFLOW_CTX_EXECUTION_DATE=2021-07-07T18:59:40.183659+00:00
[2021-07-07 18:59:44,178] {subprocess.py:52} INFO - Tmp dir root location: 
/tmp
[2021-07-07 18:59:44,179] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'python /opt/***/dags/scripts/baton_in_process.py']
[2021-07-07 18:59:44,187] {subprocess.py:75} INFO - Output:
[2021-07-07 18:59:44,383] {subprocess.py:79} INFO - ('Bearer eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJYbW9zWUpINjNNZktBZzdtOW05cWR2QVV2eHJ4OGQyektZekczYmpXLWtBIn0.eyJleHAiOjE2MjU2ODQ0NDQsImlhdCI6MTYyNTY4NDM4NCwianRpIjoiNGVjZDQ4NjAtYjQwNS00OWYxLTg4YjgtZDJhOGI5NmI4ZWM0IiwiaXNzIjoiaHR0cDovL2xvY2FsaG9zdDo5MDgwL2F1dGgvcmVhbG1zL21hc3RlciIsInN1YiI6IjVkYjc4YTE4LWUzMWUtNDdlNy1hMjFiLWQ0ZWE3NDZjZDAwNCIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLWNsaSIsInNlc3Npb25fc3RhdGUiOiI0MzdlMjNlYS1lODlmLTQzYWYtOWVjNS00YjlkZjkyOGVmZmQiLCJhY3IiOiIxIiwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJhZG1pbiJ9.MLmSdu0oaFEJPv74uprZFszHD9PF0Y20LmwBhUZjRIRb9pg164mGTXsVf1wdzHZa9M6GjBE7azrfPFvX9A-bRN9BW7Ah7QGUprf20hCHktEPFk3_B_IHJBzsMo6cb6lJ1DcFKZR2mG6nVWxsAy8zVITmJbhcdlcA8cWYl7xbom6knrfNCBwvG7zeJHpHwl04mQVUtUCvwH13l0D_LoDGhUTRALdwTz3aeNnVzQ_KdLmUceU8pPUMaccm7L0o3AkY0YqGq08oLAI9ve3vOqCkMgBl8XTaEWEgzPxJOysPmUz6nOvCY6TqEJj90Mv0Mw2J-VrU4O7kkCtX_quTAUU6TA', {'returncode': 0, 'stdout': '  % Total    
[2021-07-07 18:59:45,131] {subprocess.py:79} INFO - job posted
[2021-07-07 18:59:45,132] {subprocess.py:79} INFO - {'returncode': 0, 'stdout': '* Expire in 0 ms for 6 (transfer 0x555826d09fb0)n* Expire in 1 ms for 1 (transfer 0x555826d09fb0)n  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Currentn                                 Dload  Upload   Total   Spent    Left  Speednn  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0* Expire in 3 ms for 1 (transfer 0x555826d09fb0)n* Expire in 3 ms for 1 (transfer 0x555826d09fb0)n*   Trying 172.18.0.6...n* TCP_NODELAY setn* Expire in 200 ms for 4 (transfer 0x555826d09fb0)n* Connected to server (172.18.0.6) port 8080 (#0)n> GET /v1/s/master/jobs/e55db103-f196-4566-8eed-5db5c02c85ab HTTP/1.1n> Host: server:8080n> User-Agent: curl/7.64.0n> Accept: */*n> Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJYbW9zWUpINjNNZktBZzdtOW05cWR2QVV2eHJ4OGQyektZekczYmpXLWtBIn0.eyJleHAiOjE2MjU2ODQ0NDUsImlhdCI6MTYyNTY4NDM4NSwianRpIjoiZWU1NGE2M2EtOGViYy00MGFjLTg0NjktNzc1YjRjODU2ZWEyIiwiaXNzIjoiaHR0cDovL2xvY2FsaG9zdDo5MDgwL2F1dGgvcmVhbG1zL21hc3RlciIsInN1YiI6IjVkYjc4YTE4LWUzMWUtNDdlNy1hMjFiLWQ0ZWE3NDZjZDAwNCIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLWNsaSIsInNlc3Npb25fc3RhdGUiOiJiOTkzNGZjZS1hMmYwLTQ4MzMtOTM4OC0wMmIzYWRiMTYzMjEiLCJhY3IiOiIxIiwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJhZG1pbiJ9.FevvIr6HDTIDsgYby3kn-LZgNxzgRba6Hjn9pK4f1Bxxnx0tkSjz1uEUXLAz0uoJVPdMqzQhvBicTfZK3u8wyhhWebx6nTGi8meGW4Ue1QviBHK7Jf7jfUCPgqvJwKQcpdBczMEENck_g0O9oMPgsLkOk0GAeeqnXfzht_Wm_5uxanfLFOVrpU657JUfVt6FWZuzhnp4RXSeLDviy54Qgv9kBWtAzeT4fp-AlzsyEGUCAGy4tPjRC5F8zbFAQq2vSWmsLVCgNm2epI67yqN5zF5iZMChkigFv3y9AQB3IeBV4W0fOIqRINuu9OPBRCAU72DbgTNOBV76Pu3KmVo9_An> n< HTTP/1.1 200 OKn< Server: akka-http/10.2.3n< Date: Wed, 07 Jul 2021 18:59:45 GMTn< Content-Type: application/jsonn< Content-Length: 1385n< n{ [1385 bytes data]nn100  1385  100  1385    0     0  38472      0 --:--:-- --:--:-- --:--:-- 38472n* Connection #0 to host server left intactn', 'stderr': None} job_status: completed
[2021-07-07 18:59:45,144] {subprocess.py:83} INFO - Command exited with return code 0
[2021-07-07 18:59:45,200] {taskinstance.py:1191} INFO - Marking task as SUCCESS. dag_id=baton, task_id=in_process_job, execution_date=20210707T185940, start_date=20210707T185944, end_date=20210707T185945
[2021-07-07 18:59:45,277] {local_task_job.py:151} INFO - Task exited with return code 1

一旦触发DAG在UI中运行,我就解决了这个错误。从那时起,我能够清除每项任务的时间表,并成功地单独执行每项任务。

最新更新