Шаг 6 из 6

Конфигурация

Итак, чтобы настроить свою конфигурацию logging необходимо сделать несколько шагов:

  1. Создать свой python-модуль с конфигурацией
  2. Прописать путь до нашего модуля в airflow.cfg

Полностью перезаписывать конфигурацию Airflow я не рекомендую, поэтому мы её будем дополнять. Для этого необходимо импортировать настройки по умолчанию и сделать глубокое копирование.

from copy import deepcopy

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
LOGGING_CONFIG['handlers'].update(
    {
        'telegram_handler': {
            'class': 'logging_handlers.TelegramBotHandler',
            'chat_id': '<CHAT_ID>',
            'token': '<BOT_TOKEN>',
            'level': 'ERROR',
        }
    }
)
LOGGING_CONFIG['loggers']['airflow.task']['handlers'].append('telegram_handler')

Тут я копирую структуру конфигурации и дополняю её нашим обработчиком. Для его корректной работы необходимо заменить <CHAT_ID> на свой, его можно узнать у бота userinfo, и <BOT_TOKEN>, токен вы получаете у BotFather при создании бота.

В настройках я предполагаю, что телеграм-обработчик лежит у вас в модуле logging_handlers.py, а файл с конфигурацией называется logging_conf.py.

А теперь вопрос. Куда всё это положить? В Apache Airflow есть код подготовки путей:

def prepare_syspath():
    """Ensures that certain subfolders of AIRFLOW_HOME are on the classpath"""
    if DAGS_FOLDER not in sys.path:
        sys.path.append(DAGS_FOLDER)

    # Add ./config/ for loading custom log parsers etc, or
    # airflow_local_settings etc.
    config_path = os.path.join(AIRFLOW_HOME, 'config')
    if config_path not in sys.path:
        sys.path.append(config_path)

    if PLUGINS_FOLDER not in sys.path:
        sys.path.append(PLUGINS_FOLDER)

Он обновляет sys.path, добавляя туда директорию с DAG-файлами, папку config и plugins, которые могут быть в директории AIRFLOW_HOME. Поэтому logging-конфиг можно положить в любую из этих директорий, но я не рекомендую укладывать его в директорию с DAG-файлами, т.к. она регулярно сканируется планировщиком для обнаружения новых пайплайнов. Если не хотите трогать эти пути, то можно прописать свой путь в переменную окружения PYTHONPATH. Я предпочитаю держать папку config по пути из AIRFLOW_HOME.

Выполните следующие шаги:

  1. Сохраните код с logging-конфигурацией в logging_conf.py
  2. Сохраните код с телеграм-обработчиком в logging_handlers.py
  3. Создайте папку config в директории $AIRFLOW_HOME
  4. В airflow.cfg найдите параметр logging_config_class и задайте ему значение logging_conf.LOGGING_CONFIG
  5. Перезагрузите webserver и scheduler

Готово. Теперь можно попробовать запустить DAG, где заведомо есть задача с ошибкой, уведомление в телеграм не заставит себя долго ждать.

Код с logging-конфигом и телеграм-обработчиком лежит в репозитории в папке config.

Комментарии

А если я хочу, чтобы сообщения об ошибках получили несколько клиентов, могу ли я передать в CHAT_ID список?

Можно, но в случае нужно в обработчик добавить соответствующие изменения.

А не подскажите какие необходимо внести изменения?

В конфиге logging вместо строки в chat_id передавать список, а в Handler классе соответственно получать этот список, и итерироваться по нему, отправляя сообщения каждому chat_id из этого списка.

Не хватает скринов с примерами сообщений об ошибках, а в остальном прикольно