气流-当传感器设置为软故障时,如何发送电子邮件警报



在下面的dag中,传感器A设置为soft_fail=True,因为如果A出现故障,我希望跳过B和C。问题是,当A失败时,我仍然希望收到电子邮件提醒。但是,当soft_fail为true时,当传感器没有检测到任何东西时,A被标记为成功,并且不会发出电子邮件警报。有人能帮忙指出如何做到这一点吗?非常感谢。

A(传感器,soft_fail=True(>gt;B>gt;C

气流传感器发生故障时标记为skipped(而非success(,soft_failTrue

没有选项可以在跳过而不是回拨时添加电子邮件。但是,您可以从操作员EmailOperator创建一个新任务,该任务在传感器a标记为跳过时运行。不幸的是,当上游被跳过时,没有触发规则来运行任务,但你可以创建一个新的操作员来检查a的状态并根据它发送电子邮件。

from airflow.operators.email import EmailOperator
from airflow.utils.context import Context
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule

class MyNotifier(EmailOperator):
def __int__(self, monitor_task_id: str, notify_on_state: str, *args, **kwargs):
self.monitor_task_id = monitor_task_id
self.notify_on_state = notify_on_state
super().__init__(*args, **kwargs)
def execute(self, context: Context):
task_to_check = context["dag_run"].get_task_instance(task_id=self.monitor_task_id)
if task_to_check.state == self.notify_on_state:
super().execute(context)

notification_task = MyNotifier(
task_id="sensor_skip_notifier",
monitor_task_id="A",
trigger_rule=TriggerRule.ALL_DONE,  # to run the task when A is done regardless the state
notify_on_state=TaskInstanceState.SKIPPED,
to="<email>",
subject="<subject>",
html_content="<content>",  # you can use jinja to add run info
)
A >> notification_task

最新更新