气流任务卡在重试状态 30 分钟,然后成功运行



我对使用气流相对较新。我已经阅读了数十篇帖子,但找不到可行的解决方案。我真的很感激有人可以帮助我指出正确的方向。

我有一个需要每分钟运行一次的工作(下面的代码(。它有 4 个任务要运行:在 AWS S3 中移动文件、向 Impala 数据库发出 SQL 命令、发出第二个 SQL 命令,然后将 S3 中的文件移动到另一个位置。

所有任务最终都会成功运行,但前提是每个任务都处于重试状态约 30 分钟。鉴于这应该每分钟运行一次,这显然是行不通的。

我在日志中找不到任何有用的东西,我必须处理的是:

Task is not ready for retry yet but will be retried automatically. Current date is 2020-01-20T20:22:39.926112+00:00 and task will be retried at 2020-01-20T20:23:27.234433+00:00.

其中一个任务的日志如下所示:

--------------------------------------------------------------------------------
[2020-01-20 19:19:15,760] {taskinstance.py:842} INFO - Starting attempt 1 of 2
[2020-01-20 19:19:15,760] {taskinstance.py:843} INFO -
--------------------------------------------------------------------------------
[2020-01-20 19:19:15,773] {taskinstance.py:862} INFO - Executing <Task(PythonOperator): invalidate_metadata> on 2020-01-20T19:11:00+00:00
[2020-01-20 19:19:15,773] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'p_weather_home_commercial_v1_ingest', 'invalidate_metadata', '2020-01-20T19:11:00+00:00', '--job_id', '64', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/weather_home-commercial_v1-ingest.py', '--cfg_path', '/tmp/tmp2i9g9v59']
[2020-01-20 19:19:16,914] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:16,913] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=11729
[2020-01-20 19:19:17,633] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,632] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-01-20 19:19:17,634] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,634] {dagbag.py:92} INFO - Filling up the DagBag from /root/airflow/dags/weather_home-commercial_v1-ingest.py
[2020-01-20 19:19:17,812] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,812] {credentials.py:1032} INFO - Found credentials in shared credentials file: ~/.aws/credentials
[2020-01-20 19:19:17,829] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:17,829] {connectionpool.py:735} INFO - Starting new HTTPS connection (1): secretsmanager.us-west-2.amazonaws.com
[2020-01-20 19:19:18,092] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata [2020-01-20 19:19:18,091] {cli.py:545} INFO - Running <TaskInstance: p_weather_home_commercial_v1_ingest.invalidate_metadata 2020-01-20T19:11:00+00:00 [running]> on host ip-10-190-8-162.us-west-2.compute.internal
[2020-01-20 19:19:18,137] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=p_weather_home_commercial_v1_ingest
AIRFLOW_CTX_TASK_ID=invalidate_metadata
AIRFLOW_CTX_EXECUTION_DATE=2020-01-20T19:11:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-01-20T19:11:00+00:00
[2020-01-20 19:19:18,156] {logging_mixin.py:112} INFO - [2020-01-20 19:19:18,156] {base_hook.py:84} INFO - Using connection to: id: Impala-prod. Host: jdbc:impala://internal-impala-internal-100737082.us-west-2.elb.amazonaws.com:21050/default;transportMode=sasl;AuthMech=3;SSL=1;AllowSelfSignedCerts=1;CAIssuedCertNamesMismatch=1;, Port: None, Schema: None, Login: cdh6prod.airflow, Password: XXXXXXXX, extra: {'extra__jdbc__drv_path': '/opt/airflow/var/lib/ImpalaJDBC4.jar', 'extra__jdbc__drv_clsname': 'com.cloudera.impala.jdbc4.Driver', 'extra__google_cloud_platform__project': '', 'extra__google_cloud_platform__key_path': '', 'extra__google_cloud_platform__keyfile_dict': '', 'extra__google_cloud_platform__scope': '', 'extra__google_cloud_platform__num_retries': None, 'extra__grpc__auth_type': '', 'extra__grpc__credentials_pem_file': '', 'extra__grpc__scopes': ''}
[2020-01-20 19:19:18,923] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata log4j:WARN No appenders could be found for logger (com.cloudera.impala.jdbc4.internal.apache.thrift.transport.TSaslTransport).
[2020-01-20 19:19:18,923] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata log4j:WARN Please initialize the log4j system properly.
[2020-01-20 19:19:18,923] {base_task_runner.py:115} INFO - Job 64: Subtask invalidate_metadata log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2020-01-20 19:19:19,293] {logging_mixin.py:112} INFO - [2020-01-20 19:19:19,292] {dbapi_hook.py:171} INFO - INVALIDATE METADATA weather_home.commercial_v1_loading;
[2020-01-20 19:19:19,497] {python_operator.py:114} INFO - Done. Returned value was: Ran Query '{}' against prod impala
[2020-01-20 19:19:20,700] {logging_mixin.py:112} INFO - [2020-01-20 19:19:20,700] {local_task_job.py:124} WARNING - Time since last heartbeat(0.02 s) < heartrate(5.0 s), sleeping for 4.980271 s
[2020-01-20 19:19:25,685] {logging_mixin.py:112} INFO - [2020-01-20 19:19:25,684] {local_task_job.py:103} INFO - Task exited with return code 0

但是所有任务返回的都相似,表明它们以 0 退出代码退出。

dag 是使用查询我们内部设计的元数据数据库的脚本动态生成的。我们有几百个这样的产品要做,所以它需要自动化。

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
import airflow.hooks.S3_hook 
from airflow.hooks.jdbc_hook import JdbcHook
from pymongo import MongoClient
import json
import boto3
import base64
from botocore.exceptions import ClientError
import sys

####### config ##################################################################################################
namespace = "dish.iot.housefly"
name = "Weather_Home"
version = 1.0
####### get the schema ##########################################################################################
def get_secret(secret):
secret_name = secret
region_name = "us-west-2"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
# We rethrow the exception by default.
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
else:
# Decrypts secret using the associated KMS CMK.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
return(secret)
else:
decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return(decoded_binary_secret)

sb_creds = json.loads(get_secret("*******"))

client = MongoClient("mongodb://******:"+ sb_creds["Pass"] +"@************.us-west-2.docdb.amazonaws.com:*****/?ssl=true&ssl_ca_certs=/root/airflow/rds-combined-ca-bundle.pem&replicaSet=rs0")
db = client["Schema_Store"]
dbcollection = db[namespace]
schema = dbcollection.find_one({"Namespace":namespace,"Name":name,"Version":version})
###### Airflow ################################################################################################
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2019, 12, 31),
'email': ['********'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('p_{}_{}_ingest'.format(schema['Database'],schema['Table']), default_args=default_args, catchup=False, max_active_runs=1, schedule_interval=schema['Cron'])
################################################################################################################
def executeSQL(sqlString):
impHook = JdbcHook(jdbc_conn_id="Impala-prod")
impHook.run(sqlString)
return("Ran Query '{}' against prod impala")

#########################################################################################
t1 = BashOperator(
task_id="staging_to_loading",
bash_command="aws s3 mv s3://{0}{1}/staging s3://{0}{1}/loading --recursive".format(schema['S3_bucket'],schema['S3_key']),
dag=dag
)
t2 = PythonOperator(
task_id="invalidate_metadata",
python_callable=executeSQL,
op_kwargs={
'sqlString':'INVALIDATE METADATA {}.{}_loading;'.format(schema['Database'],schema['Table']),
},
dag=dag,
)
t3 = PythonOperator(
task_id="load_data_into_impala",
python_callable=executeSQL,
op_kwargs={
'sqlString':'INSERT INTO {0}.{1}_today PARTITION (tenantid, applicationid, dt) SELECT * FROM {0}.{1}_loading;'.format(schema['Database'],schema['Table']),
},
dag=dag,
)
t4 = BashOperator(
task_id="loading_to_processed",
bash_command="aws s3 mv s3://{0}{1}/loading s3://{0}{1}/processed --recursive".format(schema['S3_bucket'],schema['S3_key']),
dag=dag
)
t1 >> t2 >> t3 >> t4

我认为,将 Python 运算符包装在 try except 中(在 except 中使用 traceback.print_exc(( 将有助于获取更多信息

顺便说一句,当我开始学习气流时,我使用这个工具来玩UI。也许,你会发现它有帮助:

http://choose.tools/tool?id=Airflow&utm_source=59844912&utm_medium=airflow&utm_campaign=so

相关内容

  • 没有找到相关文章

最新更新