Step 1 of 1

SLA или Service Level Agreement, это соглашение между Airflow и DAG или TaskInstance (оператором), что он должен быть выполнен к определенному времени. Если это соглашение нарушено, то Airflow отправляет письмо на почту, также все т.н. SLAs Misses возможно увидеть в панели: Browse->SLA Misses.

Настройка задаётся на уровне оператора в коде (через default_args для DAG или напрямую в оператор), также для DAG можно указать callback, который будет вызван в случае нарушения соглашения (например, сообщение в телеграм).

В defaults_args нужно параметру sla передать timedelta. Для передачи callback в настройках DAG присвойте sla_miss_callback любой Callable объект.

В документации Airflow весьма скудно описан процесс настройки почтового сервера для отправки писем. Информация актуальна для версии до 2.0.1 включительно. В более поздних версиях указывать smtp_user и smtp_password нужно будет только через connection id.

По умолчанию Airflow использует SMTP backend, которые читает настройки SMTP сервера из секции smtp конфигурационного файла airflow.cfg:

[smtp]

# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.mailgun.org
smtp_starttls = True
smtp_ssl = False

smtp_user = postmaster@startdatajourney.com
smtp_password = password

smtp_port = 587

# адрес отправителя сообщений
smtp_mail_from = airflow-sla@startdatajourney.com
smtp_timeout = 30
smtp_retry_limit = 5

Я привёл пример настроек для сервиса MailGun.

Давайте взглянем на пример кода, нарушающего SLA:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.bash import BashOperator

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(dag)
    print(task_list)
    print(blocking_task_list)
    print(slas)
    print(blocking_tis)

default_args = {
    'email': 'adil.khashtamov@gmail.com',
    'sla': timedelta(seconds=5),
}

with DAG(
    dag_id='sla_example_dag',
    sla_miss_callback=sla_miss_callback,
    start_date=datetime(2021, 3, 16),
    schedule_interval='*/5 * * * *',
    catchup=False,
    default_args=default_args,
) as dag:

    cmd = BashOperator(
        task_id='slow_task',
        bash_command='sleep 15',
        dag=dag,
    )

В этом DAG всего один таск — BashOperator, который засыпает на 15 секунд, sla для него настроен на 5 секунд. Период запуска — ежеминутно. Важно понимать, что SLA считается от времени запуска DAG, а не старта оператора. Сообщения обо всех нарушениях SLA (SLA Misses) на почту отправляются после окончания периода. Периодом в данном случае является интервал запусков. Т.е. для запущенного DAG (DagRun) 16 марта 2021 года в 00:00:00, SLA Miss придёт после 00:00:01 (это с ежеминутным запуском)

Если запуск настроен на каждый 5 минут, то SLA Miss для DagRun с execution_date = 2021-03-16T00:05:00 будет отправлен после 2021-03-16T00:10:00.

Comments