气流sla_miss_callback功能未触发



我一直在尝试获取松弛消息回调以触发SLA未命中。我注意到:

  1. SLA 未命中在气流 Web UI 中成功注册slamiss/list/

  2. on_failure_callback成功运作

但是,sla_miss_callback函数本身永远不会被触发。

我尝试过:

  • 不同的组合增加了slasla_miss_callbackdefault_args级别、DAG 级别和任务级别

  • 检查我们的调度程序和工作线程上的日志以获取与 SLA 相关的消息(另请参阅此处(,但我们尚未看到任何内容

  • 如果从任何其他
    基本任务或函数调用,则 slack 消息回调函数有效

default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0, 
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")

sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task

Airflow 1(未在 Airflow 2 上测试!(

使用SLA missedExecution Timeout警报的示例:

  • 首先,您将在任务运行 2 分钟后获得SLA missed
  • 然后,4分钟后,任务将失败并显示Execution Timeout警报。
"sla": timedelta(minutes=2),  # Default Task SLA time
"execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout

此外,您在消息中具有log_url权限,因此您可以在Airflow中轻松打开任务日志。

示例松弛消息

import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple
from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."

def send_slack_alert_sla_miss(
dag: DAG,
task_list: str,
blocking_task_list: str,
slas: List[Tuple],
blocking_tis: List[TaskInstance],
) -> None:
"""Send `SLA missed` alert to Slack"""
task_instance: TaskInstance = blocking_tis[0]
message = dedent(
f"""
:warning: Task SLA missed.
*DAG*: {dag.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Task State*: `{task_instance.state}`
*Blocking Task List*: {blocking_task_list}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(message=message)

def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
"""Send `Task Failed` notification to Slack"""
task_instance: TaskInstance = context.get("task_instance")
exception: AirflowException = context.get("exception")
status = SLACK_STATUS_TASK_FAILED
if isinstance(exception, AirflowTaskTimeout):
status = SLACK_STATUS_EXECUTION_TIMEOUT
# Prepare formatted Slack message
message = dedent(
f"""
{status}
*DAG*: {task_instance.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Execution Timeout*: {task_instance.task.execution_timeout}
_** Max time allowed for the execution of this task instance_
*Task Duration*: {timedelta(seconds=round(task_instance.duration))}
*Task State*: `{task_instance.state}`
*Exception*: {exception}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(
message=message,
context=context,
)

def send_slack_alert(
message: str,
context: Optional[Dict[str, Any]] = None,
) -> None:
"""Send prepared message to Slack"""
slack_webhook_token = BaseHook.get_connection("slack").password
notification = SlackWebhookOperator(
task_id="slack_notification",
http_conn_id="slack",
webhook_token=slack_webhook_token,
message=message,
username="airflow",
)
notification.execute(context)

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"email": ["test@test,com"],
"email_on_failure": True,
"depends_on_past": False,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(minutes=2),  # Default Task SLA time
"execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout
"on_failure_callback": send_slack_alert_task_failed,
}
with DAG(
dag_id="test_sla",
schedule_interval="*/5 * * * *",
start_date=datetime(2021, 1, 11),
default_args=default_args,
sla_miss_callback=send_slack_alert_sla_miss,  # Must be set here, not in default_args!
) as dag:
delay_python_task = PythonOperator(
task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
sla=timedelta(minutes=2),
python_callable=lambda: time.sleep(300),
)

我曾经遇到过类似的情况。
在调查调度程序日志时,我发现了以下错误:

[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO -  --------------> ABOUT TO CALL SLA MISS CALL BACK  
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG 
sla_miss_alert() takes 1 positional arguments but 5 were given

问题是您的sla_miss_callback函数只期望 1 个参数,但实际上这应该是这样的:

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Function that alerts me that dag_id missed sla"""
# <function code here>

有关参考,请查看气流源代码。

注意:不要把sla_miss_callback=sla_miss_alert放在default_args。它应该在 DAG 定义本身中定义。

似乎使sla_miss_callback工作的唯一方法是显式传递它所需的参数......没有其他任何东西对我有用,这些参数:"DAG"、"task_list"、"blocking_task_list"、"SLA"和"blocking_tis"根本没有发送到回调。

TypeError: print_sla_miss() missing 5 required positional arguments: 'dag', 'task_list', 'blocking_task_list', 'slas', and 'blocking_tis'

很多答案都完成了 90%,所以我想使用 bash 运算符分享我的示例,这些运算符结合了我从上述所有响应和其他资源中找到的内容

最重要的事情是如何在 dag 定义中而不是在default_args中定义sla_miss_callback,以及不将上下文传递给 sla 函数。

"""
A simple example showing the basics of using a custom SLA notification response.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.slack_operator import SlackAPIPostOperator
from slack import slack_attachment
from airflow.hooks.base_hook import BaseHook
import urllib
#slack alert for sla_miss
def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
dag_id = slas[0].dag_id
task_id = slas[0].task_id
execution_date = slas[0].execution_date.isoformat()
base_url = 'webserver_url_here'
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
message = (f':alert: *Airflow SLA Miss*'
f'nn'
f'*DAG:* {dag_id}n'
f'*Task:* {task_id}n'
f'*Execution Date:* {execution_date}'
f'nn'
f'<{dag_url}|Click here to view DAG>')
sla_miss_alert = SlackAPIPostOperator(
task_id='slack_sla_miss',
channel='airflow-alerts-test',
token=str(BaseHook.get_connection("slack").password),
text = message
)
return sla_miss_alert.execute()
#slack alert for successful task completion
def slack_success_task(context):
success_alert = SlackAPIPostOperator(
task_id='slack_success',
channel='airflow-alerts-test',
token=str(BaseHook.get_connection("slack").password),
text = "Test successful"
)
return success_alert.execute(context=context)

default_args = {
"depends_on_past": False,
'start_date': datetime(2020, 11, 18),
"retries": 0
}
# Create a basic DAG with our args
# Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.
dag = DAG(
dag_id='sla_slack_v6',
default_args=default_args,
sla_miss_callback=slack_sla_miss,
catchup=False,
# A common interval to make the job fire when we run it
schedule_interval=timedelta(minutes=3)
)
# Add a task that will always fail the SLA
t1 = BashOperator(
task_id='timeout_test_sla_miss',
# Sleep 60 seconds to guarantee we miss the SLA
bash_command='sleep 60',
# Do not retry so the SLA miss fires after the first execution
retries=0,
#on_success_callback = slack_success_task,
provide_context = True,
# Set our task up with a 10 second SLA
sla=timedelta(seconds=10),
dag=dag
)
t2 = BashOperator(
task_id='timeout_test_sla_miss_task_2',
# Sleep 30 seconds to guarantee we miss the SLA of 20 seconds set in this task
bash_command='sleep 30',
# Do not retry so the SLA miss fires after the first execution
retries=0,
#on_success_callback = slack_success_task,
provide_context = True,
# Set our task up with a 20 second SLA
sla=timedelta(seconds=20),
dag=dag
)
t3 = BashOperator(
task_id='timeout_test_sla_miss_task_3',
# Sleep 60 seconds to guarantee we miss the SLA
bash_command='sleep 60',
# Do not retry so the SLA miss fires after the first execution
retries=0,
#on_success_callback = slack_success_task,
provide_context = True,
# Set our task up with a 30 second SLA
sla=timedelta(seconds=30),
dag=dag
)

t1 >> t2 >> t3

我认为气流文档在这方面有点模糊。

而不是方法签名作为

def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis)

像这样
修改您的签名

def slack_sla_miss(*args, **kwargs)

这样,所有参数都会被传递。您不会收到日志中看到的错误。

在网址上学到了这一点 - https://www.cloudwalker.io/2020/12/15/airflow-sla-management/

我遇到了同样的问题,但能够让它使用以下代码:


import logging as log
import airflow
import time
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow import configuration
import urllib
from airflow.operators.slack_operator import SlackAPIPostOperator

def sleep():
""" Sleep for 2 minutes """
time.sleep(60*2)
log.info("Slept for 2 minutes")

def simple_print(**context):
""" Prints a message """
print("Hello World!")

def slack_on_sla_miss(dag,
task_list,
blocking_task_list,
slas,
blocking_tis):
log.info('Running slack_on_sla_miss')
slack_conn_id = 'slack_default'
slack_channel = '#general'
dag_id = slas[0].dag_id
task_id = slas[0].task_id
execution_date = slas[0].execution_date.isoformat()
base_url = configuration.get('webserver', 'BASE_URL')
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
message = (f':o: *Airflow SLA Miss*'
f'nn'
f'*DAG:* {dag_id}n'
f'*Task:* {task_id}n'
f'*Execution Date:* {execution_date}'
f'nn'
f'<{dag_url}|Click here to view>')
slack_op = SlackAPIPostOperator(task_id='slack_failed',
slack_conn_id=slack_conn_id,
channel=slack_channel,
text=message)
slack_op.execute()

default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0, minute=1),
"retries": 0,
'priority_weight': 1,
}
dag = DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=slack_on_sla_miss,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
)
with dag:
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
sla=timedelta(minutes=1),
)
sleep >> simple_task

我自己也遇到了这个问题。与寻找 python 可调用函数的on_failure_callback不同,sla_miss_callback似乎需要完整的函数调用。

一个对我有用的例子:

def sla_miss_alert(dag_id):
"""
Function that alerts me that dag_id missed sla
"""
<function code here>
def task_failure_alert(dag_id, context):
"""
Function that alerts me that a task failed
"""
<function code here>

dag_id = 'sla_test'
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': partial(task_failure_alert, dag_id),
'sla': timedelta(minutes=1),
"retries": 0, 
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=sla_miss_alert(dag_id),
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)

据我所知,sla_miss_callback无法访问上下文,这很不幸。一旦我停止寻找上下文,我终于收到了警报。

最新更新