我想使用Sendgrid在GCP composer中收到任务成功、失败和重试的电子邮件通知。
目前,我的DAG中的所有任务都在成功运行。在这种情况下,我想收到通知。
此外,当某些任务失败或正在重试时,我也想获得这些通知。我执行了以下步骤,在强制任务失败时没有收到任何通知。
- 创建了GCP Composer环境,添加了环境变量
SENDGRID_MAIL_FROM : abc@gmail.com
SENDGRID_API_KEY :
- 创建于DAG之后
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.operators.email_operator import EmailOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 3, 30),
'email': ['abc@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
schedule_interval = "05 23 * * *"
dag = DAG(
'DAG_NAME',
default_args=default_args,
schedule_interval=schedule_interval
)
# Config variables
BQ_CONN_ID = ""
BQ_PROJECT = ""
BQ_DATASET = ""
## Task 1
t1 = BigQueryCheckOperator(----)
## Task 2
t2 = BigQueryCheckOperator(----)
## Task 3
t3 = BigQueryOperator(----)
t4 = EmailOperator(
task_id='send_email',
to='abc@gmail.com',
subject='Airflow Alert',
html_content=""" <h3>Email Test</h3> """,
dag=dag
)
# Setting up Dependencies
t1>>t2>>t3>>t4
我遗漏了什么吗?请告诉我需要做什么,谢谢。
首先,您需要检查您使用的Composer和Sendgrid的版本。
例如,airflow-1.10.3
上支持的Sendgrid的最新版本是v5.6.0
。您可以参考气流的setup.py,了解为特定气流版本安装的依赖项。
我建议您再次查看有关使用Cloud Composer设置Sendgrid的说明。确保以下几点:
- 您按照指南中的说明设置环境变量,要将Sendgrid配置为您的电子邮件服务器,您需要获得您的
SENDGRID_API_KEY
(您是否具有正确的权限生成它?密钥至少必须具有Mail send
权限才能发送电子邮件(和SENDGRID_MAIL_FROM
(结构正确吗?noreply-composer@
(作为环境变量 - 在
airflow.cfg
文件中,检查email_backend
变量是否设置为使用Sendgrid:
email_backend = airflow.contrib.utils.sendgrid.send_email
- 如指南所述,尝试发送测试DAG,例如,您可以使用以下方法:
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'name.surname',
'start_date': days_ago(1),
'email_on_failure': True,
'email': ['name.surname@company.com'],
}
dag = DAG(
'mail-test',
schedule_interval='@once',
default_args=default_args,
)
send_mail = EmailOperator(
task_id='sendmail',
to='name.surname@company.com',
subject='TEST Mail from Cloud Composer',
html_content='Mail Contents',
dag=dag,
)
failed_bash = BashOperator(
task_id='run_bash',
bash_command='exit 1',
dag=dag,
)
send_mail >> failed_bash
此外,请检查您的电子邮件客户端中的垃圾邮件过滤器。如果这种情况继续失败,我会开始怀疑防火墙规则(如果你已经编辑过它们(可能是导致问题的原因。
让我知道结果。我希望它能有所帮助。