无法使用Sendgrid在GCP composer中获取任务失败、成功和重试通知



我想使用Sendgrid在GCP composer中收到任务成功、失败和重试的电子邮件通知。

目前,我的DAG中的所有任务都在成功运行。在这种情况下,我想收到通知。

此外,当某些任务失败或正在重试时,我也想获得这些通知。我执行了以下步骤,在强制任务失败时没有收到任何通知。

  1. 创建了GCP Composer环境,添加了环境变量
SENDGRID_MAIL_FROM : abc@gmail.com
SENDGRID_API_KEY : 
  1. 创建于DAG之后
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.operators.email_operator import EmailOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,    
'start_date': datetime(2020, 3, 30),
'email': ['abc@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}

schedule_interval = "05 23 * * *"

dag = DAG(
'DAG_NAME', 
default_args=default_args, 
schedule_interval=schedule_interval
)
# Config variables
BQ_CONN_ID = ""
BQ_PROJECT = ""
BQ_DATASET = ""
## Task 1
t1 = BigQueryCheckOperator(----)
## Task 2
t2 = BigQueryCheckOperator(----)
## Task 3
t3 = BigQueryOperator(----)
t4  = EmailOperator(
task_id='send_email',
to='abc@gmail.com',
subject='Airflow Alert',
html_content=""" <h3>Email Test</h3> """,
dag=dag
)
# Setting up Dependencies
t1>>t2>>t3>>t4

我遗漏了什么吗?请告诉我需要做什么,谢谢。

首先,您需要检查您使用的Composer和Sendgrid的版本。

例如,airflow-1.10.3上支持的Sendgrid的最新版本是v5.6.0。您可以参考气流的setup.py,了解为特定气流版本安装的依赖项。

我建议您再次查看有关使用Cloud Composer设置Sendgrid的说明。确保以下几点:

  • 您按照指南中的说明设置环境变量,要将Sendgrid配置为您的电子邮件服务器,您需要获得您的SENDGRID_API_KEY(您是否具有正确的权限生成它?密钥至少必须具有Mail send权限才能发送电子邮件(和SENDGRID_MAIL_FROM(结构正确吗?noreply-composer@(作为环境变量
  • airflow.cfg文件中,检查email_backend变量是否设置为使用Sendgrid:
email_backend = airflow.contrib.utils.sendgrid.send_email
  • 如指南所述,尝试发送测试DAG,例如,您可以使用以下方法:
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'name.surname',
'start_date': days_ago(1),
'email_on_failure': True,
'email': ['name.surname@company.com'],
}
dag = DAG(
'mail-test',
schedule_interval='@once',
default_args=default_args,
)
send_mail = EmailOperator(
task_id='sendmail',
to='name.surname@company.com',
subject='TEST Mail from Cloud Composer',
html_content='Mail Contents',
dag=dag,
)
failed_bash = BashOperator(
task_id='run_bash',
bash_command='exit 1',
dag=dag,
)
send_mail >> failed_bash

此外,请检查您的电子邮件客户端中的垃圾邮件过滤器。如果这种情况继续失败,我会开始怀疑防火墙规则(如果你已经编辑过它们(可能是导致问题的原因。

让我知道结果。我希望它能有所帮助。

相关内容

最新更新