SLA или Service Level Agreement, это соглашение между Airflow и DAG или TaskInstance (оператором), что он должен быть выполнен к определенному времени. Если это соглашение нарушено, то Airflow отправляет письмо на почту, также все т.н. SLAs Misses возможно увидеть в панели: Browse->SLA Misses
.
Настройка задаётся на уровне оператора в коде (через default_args для DAG или напрямую в оператор), также для DAG можно указать callback, который будет вызван в случае нарушения соглашения (например, сообщение в телеграм).
В defaults_args нужно параметру sla
передать timedelta
. Для передачи callback в настройках DAG присвойте sla_miss_callback
любой Callable объект.
В документации Airflow весьма скудно описан процесс настройки почтового сервера для отправки писем. Информация актуальна для версии до 2.0.1 включительно. В более поздних версиях указывать smtp_user и smtp_password нужно будет только через connection id.
По умолчанию Airflow использует SMTP backend, которые читает настройки SMTP сервера из секции smtp
конфигурационного файла airflow.cfg
:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.mailgun.org
smtp_starttls = True
smtp_ssl = False
smtp_user = postmaster@startdatajourney.com
smtp_password = password
smtp_port = 587
# адрес отправителя сообщений
smtp_mail_from = airflow-sla@startdatajourney.com
smtp_timeout = 30
smtp_retry_limit = 5
Я привёл пример настроек для сервиса MailGun.
Давайте взглянем на пример кода, нарушающего SLA:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(dag)
print(task_list)
print(blocking_task_list)
print(slas)
print(blocking_tis)
default_args = {
'email': 'adil.khashtamov@gmail.com',
'sla': timedelta(seconds=5),
}
with DAG(
dag_id='sla_example_dag',
sla_miss_callback=sla_miss_callback,
start_date=datetime(2021, 3, 16),
schedule_interval='*/5 * * * *',
catchup=False,
default_args=default_args,
) as dag:
cmd = BashOperator(
task_id='slow_task',
bash_command='sleep 15',
dag=dag,
)
В этом DAG всего один таск — BashOperator, который засыпает на 15 секунд, sla для него настроен на 5 секунд. Период запуска — ежеминутно. Важно понимать, что SLA считается от времени запуска DAG, а не старта оператора. Сообщения обо всех нарушениях SLA (SLA Misses) на почту отправляются после окончания периода. Периодом в данном случае является интервал запусков. Т.е. для запущенного DAG (DagRun) 16 марта 2021 года в 00:00:00, SLA Miss придёт после 00:00:01 (это с ежеминутным запуском)
Если запуск настроен на каждый 5 минут, то SLA Miss для DagRun с execution_date = 2021-03-16T00:05:00 будет отправлен после 2021-03-16T00:10:00.