气流BashOperator完成代码,错误代码为0.但是,Airflow会将任务标记为失败



我正在处理Airflow。我有几个Bash操作符,它们调用Python代码。通常情况下,它运行良好。然而,从昨天开始,我就面临着一种无法理解的情况。在任务的日志中,一切正常,如下所示;

*** Reading local file: /opt/airflow/logs/dag_id=derin_emto_preprocess/run_id=manual__2022-10-01T13:54:50.246801+00:00/task_id=emto_preprocess-month0day0/attempt=1.log
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: derin_emto_preprocess.emto_preprocess-month0day0 manual__2022-10-01T13:54:50.246801+00:00 [queued]>
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: derin_emto_preprocess.emto_preprocess-month0day0 manual__2022-10-01T13:54:50.246801+00:00 [queued]>
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1357} INFO - Starting attempt 1 of 1
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1377} INFO - Executing <Task(BashOperator): emto_preprocess-month0day0> on 2022-10-01 13:54:50.246801+00:00
[2022-10-01, 13:55:21 UTC] {standard_task_runner.py:52} INFO - Started process 624 to run task
[2022-10-01, 13:55:21 UTC] {standard_task_runner.py:79} INFO - Running: ['***', 'tasks', 'run', 'derin_emto_preprocess', 'emto_preprocess-month0day0', 'manual__2022-10-01T13:54:50.246801+00:00', '--job-id', '8958', '--raw', '--subdir', 'DAGS_FOLDER/derin_emto_preprocess.py', '--cfg-path', '/tmp/tmpjn_8tmiv', '--error-file', '/tmp/tmp_jr_2w3j']
[2022-10-01, 13:55:21 UTC] {standard_task_runner.py:80} INFO - Job 8958: Subtask emto_preprocess-month0day0
[2022-10-01, 13:55:21 UTC] {task_command.py:369} INFO - Running <TaskInstance: derin_emto_preprocess.emto_preprocess-month0day0 manual__2022-10-01T13:54:50.246801+00:00 [running]> on host 5b44f8453a08
[2022-10-01, 13:55:21 UTC] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=derin_emto_preprocess
AIRFLOW_CTX_TASK_ID=emto_preprocess-month0day0
AIRFLOW_CTX_EXECUTION_DATE=2022-10-01T13:54:50.246801+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-01T13:54:50.246801+00:00
[2022-10-01, 13:55:21 UTC] {subprocess.py:62} INFO - Tmp dir root location:
/tmp
[2022-10-01, 13:55:21 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'python /opt/***/dags/scripts/derin/pipeline/pipeline.py --valid_from=20200101 --valid_until=20200102 --purpose=emto_preprocess --module=emto_preprocess --***=True']
[2022-10-01, 13:55:21 UTC] {subprocess.py:85} INFO - Output:
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:22 :  Hello, world!
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:22 :  [20200101, 20200102)
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:22 :  Running emto_preprocess purpose
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - Current directory : /opt/***/dags
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:22 :  Airflow parameter passed: changing configuration..
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:24 :  Parallel threads: 15
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:24 :  External money transfer out: preprocess is starting..
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO -
Thread None for emto_preprocess:   0%|          | 0/1 [00:00<?, ?it/s]
Thread None for emto_preprocess: 100%|██████████| 1/1 [00:00<00:00, 12633.45it/s]
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:24 :  DEBUG: Checking existing files
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:24 :  This module is already processed
[2022-10-01, 13:55:24 UTC] {subprocess.py:92} INFO - 2022-10-01 13:55:24 :  Good bye!
[2022-10-01, 13:55:24 UTC] {subprocess.py:96} INFO - Command exited with return code 0
[2022-10-01, 13:55:24 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=derin_emto_preprocess, task_id=emto_preprocess-month0day0, execution_date=20221001T135450, start_date=20221001T135521, end_date=20221001T135524
[2022-10-01, 13:55:24 UTC] {local_task_job.py:156} INFO - Task exited with return code 0
[2022-10-01, 13:55:25 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

然而,Airflow将此任务标记为失败。我该怎么解决这个问题?

我理解并解决了这个奇怪的问题。当Airflow渲染/上传dags时,它会读取/Airflow/dags/文件夹中的所有文件。我有一个250 GB的数据存储文件夹,由许多羽毛文件组成。我想阅读文件花了太多时间,造成了这种情况。解决方案是创建一个.airflowignore文件,并在.airlowignore文件中添加其他目录(不存储dag文件(。

最新更新