Итак, чтобы настроить свою конфигурацию logging необходимо сделать несколько шагов:
airflow.cfg
Полностью перезаписывать конфигурацию Airflow я не рекомендую, поэтому мы её будем дополнять. Для этого необходимо импортировать настройки по умолчанию и сделать глубокое копирование.
from copy import deepcopy
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
LOGGING_CONFIG['handlers'].update(
{
'telegram_handler': {
'class': 'logging_handlers.TelegramBotHandler',
'chat_id': '<CHAT_ID>',
'token': '<BOT_TOKEN>',
'level': 'ERROR',
}
}
)
LOGGING_CONFIG['loggers']['airflow.task']['handlers'].append('telegram_handler')
Тут я копирую структуру конфигурации и дополняю её нашим обработчиком. Для его корректной работы необходимо заменить <CHAT_ID>
на свой, его можно узнать у бота userinfo, и <BOT_TOKEN>
, токен вы получаете у BotFather при создании бота.
В настройках я предполагаю, что телеграм-обработчик лежит у вас в модуле logging_handlers.py
, а файл с конфигурацией называется logging_conf.py
.
А теперь вопрос. Куда всё это положить? В Apache Airflow есть код подготовки путей:
def prepare_syspath():
"""Ensures that certain subfolders of AIRFLOW_HOME are on the classpath"""
if DAGS_FOLDER not in sys.path:
sys.path.append(DAGS_FOLDER)
# Add ./config/ for loading custom log parsers etc, or
# airflow_local_settings etc.
config_path = os.path.join(AIRFLOW_HOME, 'config')
if config_path not in sys.path:
sys.path.append(config_path)
if PLUGINS_FOLDER not in sys.path:
sys.path.append(PLUGINS_FOLDER)
Он обновляет sys.path
, добавляя туда директорию с DAG-файлами, папку config
и plugins
, которые могут быть в директории AIRFLOW_HOME
. Поэтому logging-конфиг можно положить в любую из этих директорий, но я не рекомендую укладывать его в директорию с DAG-файлами, т.к. она регулярно сканируется планировщиком для обнаружения новых пайплайнов. Если не хотите трогать эти пути, то можно прописать свой путь в переменную окружения PYTHONPATH. Я предпочитаю держать папку config
по пути из AIRFLOW_HOME
.
Выполните следующие шаги:
logging_conf.py
logging_handlers.py
$AIRFLOW_HOME
airflow.cfg
найдите параметр logging_config_class
и задайте ему значение logging_conf.LOGGING_CONFIG
Готово. Теперь можно попробовать запустить DAG, где заведомо есть задача с ошибкой, уведомление в телеграм не заставит себя долго ждать.
Код с logging-конфигом и телеграм-обработчиком лежит в репозитории в папке config.
Можно, но в случае нужно в обработчик добавить соответствующие изменения.
А не подскажите какие необходимо внести изменения?
В конфиге logging вместо строки в chat_id передавать список, а в Handler классе соответственно получать этот список, и итерироваться по нему, отправляя сообщения каждому chat_id из этого списка.
Не хватает скринов с примерами сообщений об ошибках, а в остальном прикольно
Alexandr Vitkovskiy 8 Июнь 2021
А если я хочу, чтобы сообщения об ошибках получили несколько клиентов, могу ли я передать в CHAT_ID список?